[ 
https://issues.apache.org/activemq/browse/AMQ-1845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=44408#action_44408
 ] 

silverhoof edited comment on AMQ-1845 at 7/21/08 12:20 AM:
-----------------------------------------------------------

This test can reproduce the message loss using ActiveMQ 5.0 and 5.1.

The producer side have its own broker and have a fixed IP address.
On the producer side, the spring configuration is like follows:
[code]
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xmlns:jee="http://www.springframework.org/schema/jee"; 
xmlns:amq="http://activemq.apache.org/schema/core";
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
        http://www.springframework.org/schema/jee 
http://www.springframework.org/schema/jee/spring-jee-2.0.xsd
        http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>

        <amq:broker brokerName="producer" useJmx="true" persistent="true">
                <amq:transportConnectors>
                        <amq:transportConnector uri="tcp://localhost:61616" />
                </amq:transportConnectors>
        </amq:broker>

        <!-- Jms ConnectionFactory -->
        <bean id="jmsFactory" 
class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://localhost:61616" />
        </bean>


        <!-- Spring JMS SimpleConverter -->
        <bean id="simpleConverter" 
class="org.springframework.jms.support.converter.SimpleMessageConverter" />

        <!-- JMS Queue Template -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
                <property name="connectionFactory" ref="jmsFactory" />
                <property name="messageConverter" ref="converter" />
        </bean>

        <!-- Message Converter -->
        <bean id="converter" class="com.al.SimpleConverter">
                <property name="converter">
                        <ref local="simpleConverter" />
                </property>
        </bean>

        <!-- Message porducer -->
        <bean id="sender" class="com.al.DefaultSender">
                <property name="template" ref="jmsTemplate" />
                <property name="destinationName" value="test-out" />
        </bean>

</beans>
[/code]
The test app at producer side:
package com.al;

import org.apache.log4j.Logger;

public class TestApp {
        /**
         * Logger for this class
         */
        private static final Logger logger = Logger.getLogger(TestApp.class);

        public static void main(String[] args) {

                logger.debug("start test...");
                //Initializing spring context
                Context.init();
                // uncomment to send messages

                DefaultSender sender = Context.getBean("sender");
                int idx = 1;
                int count = 3000;
                while (idx <= count) {
                        
sender.sendMessage(SimpleMessageHelper.genSimpleMessage(idx));
                        logger.debug("send out message : payload is " + idx);
                        idx++;
                }

                /* Infinitely hold main thread  to keep the spring context 
running
                */
                Object lock = new Object();
                synchronized (lock) {
                        try {
                                lock.wait();
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                }

        }
}

The test client is very simple which send 3000 messages to the queue "test-out" 
on the producer side broker.

============================================================================================

The consumer side using dynamic IP address and deployed on another machine.
We have a broker on one separate machine using a duplex = "true" network 
connector
Consumer side Spring configuration file is as follows:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xmlns:jee="http://www.springframework.org/schema/jee"; 
xmlns:amq="http://activemq.apache.org/schema/core";
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
        http://www.springframework.org/schema/jee 
http://www.springframework.org/schema/jee/spring-jee-2.0.xsd
        http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>

        <amq:broker brokerName="brokerA" useJmx="true" persistent="true">

                <amq:networkConnectors>
                        <amq:networkConnector 
uri="static://(tcp://【ip-for-producer-machine】:61616)" duplex="true"/>
                </amq:networkConnectors>

                <amq:transportConnectors>
                        <amq:transportConnector uri="tcp://localhost:61616" />
                </amq:transportConnectors>

        </amq:broker>

        <!-- Jms ConnectionFactory -->

        <bean id="jmsFactory" 
class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://localhost:61616" />
        </bean>

        <!-- Spring JMS SimpleConverter -->
        <bean id="simpleConverter" 
class="org.springframework.jms.support.converter.SimpleMessageConverter" />


        <!-- Message Converter -->
        <bean id="converter" class="com.al.SimpleConverter">
                <property name="converter">
                        <ref local="simpleConverter" />
                </property>
        </bean>

        <!-- MDP -->
        <!-- consumer 1 -->
        <bean id="listener" 
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
                <constructor-arg>
                        <bean class="com.al.DefaultListener" />
                </constructor-arg>
                <property name="defaultListenerMethod" value="onMessage" />
                <property name="messageConverter" ref="converter" />
        </bean>

        <bean id="listenerContainer" 
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                <property name="connectionFactory" ref="jmsFactory" />
                <property name="destinationName" value="test-out" />
                <property name="messageListener" ref="listener" />
                <property name="sessionTransacted" value="true" />
        </bean>
</beans>

Default Listener code:
package com.al;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DefaultListener {

        private static Log logger = LogFactory.getLog(DefaultListener.class);

        static int i = 1;

        public void onMessage(SimpleMessage message) {
                int j = Integer.valueOf(message.getPayLoad());
                logger.debug("receive : j = " + j);
                if (i != j) {
                        logger.debug("warn : i=" + i + ",j=" + j);
                }
                i++;
        }
}

The default listener print out the message and compare the index in the message 
with the number of message received (It should match if no message loss occurs).


===========================================================================================
And this is how to reproduce message loss in this environment.
1. Start producer application and consumer application
2. Producer application starts to send 3000 messages to the queue "test-out"
3. Consumer application starts to receive messages from the local distributed 
queue.
4. During the message sending process, disconnect the network cable on consumer 
machine.
5. You will find that the producer machine still sending messages to the queue 
"test-out", which is normal because these messages should be persistent in 
producer broker's "test-out" queue. And the mean time the consumer side stopped 
getting messages from the "test-out" queue, which is also normal.
6. When you connect the network cable again on the consumer machine(during the 
message sending or after the producer node finished sending message) , the 
consumer broker receives message from the "test-out" queue again. But logger 
displays the message count miss match with the message index in the message 
payload, which indicates message loss happened.


And by the way, this also reproduced when we trying to avoid duplex connection 
by define network connector on both side. This is archived by fixed IP node 
connecting to a dynamic IP node using a domain name (simulated by using hosts 
file to provide routing information)



      was (Author: silverhoof):
    This test can reproduce the message loss using ActiveMQ 5.0 and 5.1.

The producer side have its own broker and have a fixed IP address.
On the producer side, the spring configuration is like follows:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xmlns:jee="http://www.springframework.org/schema/jee"; 
xmlns:amq="http://activemq.apache.org/schema/core";
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
        http://www.springframework.org/schema/jee 
http://www.springframework.org/schema/jee/spring-jee-2.0.xsd
        http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>

        <amq:broker brokerName="producer" useJmx="true" persistent="true">
                <amq:transportConnectors>
                        <amq:transportConnector uri="tcp://localhost:61616" />
                </amq:transportConnectors>
        </amq:broker>

        <!-- Jms ConnectionFactory -->
        <bean id="jmsFactory" 
class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://localhost:61616" />
        </bean>


        <!-- Spring JMS SimpleConverter -->
        <bean id="simpleConverter" 
class="org.springframework.jms.support.converter.SimpleMessageConverter" />

        <!-- JMS Queue Template -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
                <property name="connectionFactory" ref="jmsFactory" />
                <property name="messageConverter" ref="converter" />
        </bean>

        <!-- Message Converter -->
        <bean id="converter" class="com.al.SimpleConverter">
                <property name="converter">
                        <ref local="simpleConverter" />
                </property>
        </bean>

        <!-- Message porducer -->
        <bean id="sender" class="com.al.DefaultSender">
                <property name="template" ref="jmsTemplate" />
                <property name="destinationName" value="test-out" />
        </bean>

</beans>

The test app at producer side:
package com.al;

import org.apache.log4j.Logger;

public class TestApp {
        /**
         * Logger for this class
         */
        private static final Logger logger = Logger.getLogger(TestApp.class);

        public static void main(String[] args) {

                logger.debug("start test...");
                //Initializing spring context
                Context.init();
                // uncomment to send messages

                DefaultSender sender = Context.getBean("sender");
                int idx = 1;
                int count = 3000;
                while (idx <= count) {
                        
sender.sendMessage(SimpleMessageHelper.genSimpleMessage(idx));
                        logger.debug("send out message : payload is " + idx);
                        idx++;
                }

                /* Infinitely hold main thread  to keep the spring context 
running
                */
                Object lock = new Object();
                synchronized (lock) {
                        try {
                                lock.wait();
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                }

        }
}

The test client is very simple which send 3000 messages to the queue "test-out" 
on the producer side broker.

============================================================================================

The consumer side using dynamic IP address and deployed on another machine.
We have a broker on one separate machine using a duplex = "true" network 
connector
Consumer side Spring configuration file is as follows:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xmlns:jee="http://www.springframework.org/schema/jee"; 
xmlns:amq="http://activemq.apache.org/schema/core";
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
        http://www.springframework.org/schema/jee 
http://www.springframework.org/schema/jee/spring-jee-2.0.xsd
        http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>

        <amq:broker brokerName="brokerA" useJmx="true" persistent="true">

                <amq:networkConnectors>
                        <amq:networkConnector 
uri="static://(tcp://【ip-for-producer-machine】:61616)" duplex="true"/>
                </amq:networkConnectors>

                <amq:transportConnectors>
                        <amq:transportConnector uri="tcp://localhost:61616" />
                </amq:transportConnectors>

        </amq:broker>

        <!-- Jms ConnectionFactory -->

        <bean id="jmsFactory" 
class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://localhost:61616" />
        </bean>

        <!-- Spring JMS SimpleConverter -->
        <bean id="simpleConverter" 
class="org.springframework.jms.support.converter.SimpleMessageConverter" />


        <!-- Message Converter -->
        <bean id="converter" class="com.al.SimpleConverter">
                <property name="converter">
                        <ref local="simpleConverter" />
                </property>
        </bean>

        <!-- MDP -->
        <!-- consumer 1 -->
        <bean id="listener" 
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
                <constructor-arg>
                        <bean class="com.al.DefaultListener" />
                </constructor-arg>
                <property name="defaultListenerMethod" value="onMessage" />
                <property name="messageConverter" ref="converter" />
        </bean>

        <bean id="listenerContainer" 
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                <property name="connectionFactory" ref="jmsFactory" />
                <property name="destinationName" value="test-out" />
                <property name="messageListener" ref="listener" />
                <property name="sessionTransacted" value="true" />
        </bean>
</beans>

Default Listener code:
package com.al;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DefaultListener {

        private static Log logger = LogFactory.getLog(DefaultListener.class);

        static int i = 1;

        public void onMessage(SimpleMessage message) {
                int j = Integer.valueOf(message.getPayLoad());
                logger.debug("receive : j = " + j);
                if (i != j) {
                        logger.debug("warn : i=" + i + ",j=" + j);
                }
                i++;
        }
}

The default listener print out the message and compare the index in the message 
with the number of message received (It should match if no message loss occurs).


===========================================================================================
And this is how to reproduce message loss in this environment.
1. Start producer application and consumer application
2. Producer application starts to send 3000 messages to the queue "test-out"
3. Consumer application starts to receive messages from the local distributed 
queue.
4. During the message sending process, disconnect the network cable on consumer 
machine.
5. You will find that the producer machine still sending messages to the queue 
"test-out", which is normal because these messages should be persistent in 
producer broker's "test-out" queue. And the mean time the consumer side stopped 
getting messages from the "test-out" queue, which is also normal.
6. When you connect the network cable again on the consumer machine(during the 
message sending or after the producer node finished sending message) , the 
consumer broker receives message from the "test-out" queue again. But logger 
displays the message count miss match with the message index in the message 
payload, which indicates message loss happened.


And by the way, this also reproduced when we trying to avoid duplex connection 
by define network connector on both side. This is archived by fixed IP node 
connecting to a dynamic IP node using a domain name (simulated by using hosts 
file to provide routing information)


  
> Message loss in network of brokers when network connection break
> ----------------------------------------------------------------
>
>                 Key: AMQ-1845
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1845
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.1.0
>         Environment: Two brokers connected via TCP with one persistent 
> distributed queue and a producer and a consumer on each broker.
>            Reporter: Bryan Shaw
>
> Producer on broker A send 2500 message on the distributed queue at broker A.
> Producer B starts to receive message from distributed queue on broker B.
> During the receiving process, the network between these two brokers down and 
> later brought up again.
> In this senario, we found that some messages are lost. 
> It seems the broker A are sending message to broker B when the network is 
> down and these messages are removed from queue in broker A but never received 
> by broker B which causing message loss.
> Is this a bug or a configuration problem? 
> I thought the configuration like this is the store/forward pattern which 
> should ensure the message reliability in an unstable network.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to