Hi ,
We find that ActiveMQ will produce a duplicate message under many
comcumers . First We send 400,000 message to broker, then start comsumer
thread to recieve message.If We have more comsumer Threads, more
duplicate messages would appear. We recieve messages from a broker with
10 threads , duplicate message can easy produced.

The attachment are test java code and mq config file.

Following are java version and os version

1. Java version
java version "1.6.0_10-beta"
Java(TM) SE Runtime Environment (build 1.6.0_10-beta-b14)
Java HotSpot(TM) Server VM (build 11.0-b11, mixed mode)

2. Linux
Linux core6 2.6.9-42.ELsmp #1 SMP Wed Jul 12 23:32:02 EDT 2006 x86_64
x86_64 x86_64 GNU/Linux

And CPU is 2 X Quad-Core AMD Opteron(tm) Processor 2354 , total 8 cores


Leon Liu

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
   
    http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd";>

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
         <property name="locations">
            <value>file:///${activemq.base}/conf/credentials.properties</value>
         </property>      
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core"; brokerName="localhost" dataDirectory="${activemq.base}/data">

        <!-- Destination specific policies using destination names or wildcards -->
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" memoryLimit="100mb" producerFlowControl="false"/>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
        <managementContext>
            <managementContext createConnector="true"/>
        </managementContext>

        <!-- The store and forward broker networks ActiveMQ will listen to -->
        <networkConnectors>
            <networkConnector name="default-nc" uri="multicast://239.255.2.9"/>
        </networkConnectors>

        <persistenceAdapter>
            <amqPersistenceAdapter syncOnWrite="false" directory="${activemq.base}/data"  indexBinSize="8192" cleanupInterval="300000" maxFileLength="32 mb" forceRecoverReferenceStore="false" recoverReferenceStore="false"/>
        </persistenceAdapter>

        <!-- Use the following if you wish to configure the journal with JDBC -->
        <!-- -->
        <!-- persistenceAdapter syncOnWrite="false" directory="${activemq.base}/data"  maxFileLength="50 mb" forceRecoverReferenceStore="false" recoverReferenceStore="false">
            <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
        </persistenceAdapter -->
        

        <!-- Or if you want to use pure JDBC without a journal -->
        <!--
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#postgres-ds"/>
        </persistenceAdapter>
        -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="512 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="8 gb" name="foo"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="256 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!-- The transport connectors ActiveMQ will listen to -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616"/>
            <!-- transportConnector name="ssl" uri="ssl://localhost:61617"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
            <transportConnector name="xmpp" uri="xmpp://localhost:61222"/ -->
        </transportConnectors>

    </broker>

    <!--
    ** Lets deploy some Enterprise Integration Patterns inside the ActiveMQ Message Broker
    ** For more details see
    **
    ** http://activemq.apache.org/enterprise-integration-patterns.html
    -->
    <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring";>

        <!-- You can use a <package> element for each root package to search for Java routes -->
        <package>org.foo.bar</package>

        <!-- You can use Spring XML syntax to define the routes here using the <route> element -->
        <route>
            <from uri="activemq:example.A"/>
            <to uri="activemq:example.B"/>
        </route>
    </camelContext>

    <!--
    ** Lets configure some Camel endpoints
    **
    ** http://activemq.apache.org/camel/components.html
    -->

    <!-- configure the camel activemq component to use the current broker -->
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
        <property name="connectionFactory">
          <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="vm://localhost?create=false&amp;waitForStart=10000" />
            <property name="userName" value="${activemq.username}"/>
            <property name="password" value="${activemq.password}"/>
          </bean>
        </property>
    </bean>



    <!-- Uncomment to create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic -->
    <!--
    <commandAgent xmlns="http://activemq.apache.org/schema/core"; brokerUrl="vm://localhost" username="${activemq.username}" password="${activemq.password}"/>
    -->


    <!-- An embedded servlet engine for serving up the Admin console -->
    <jetty xmlns="http://mortbay.com/schemas/jetty/1.0";>
        <connectors>
            <nioConnector port="8161"/>
        </connectors>

        <handlers>
            <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/>
            <webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/>
            <webAppContext contextPath="/fileserver" resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/>
        </handlers>
    </jetty>

    <!--  This xbean configuration file supports all the standard spring xml configuration options -->

    <!-- Postgres DataSource Sample Setup -->
    <!--
    <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
      <property name="serverName" value="localhost"/>
      <property name="databaseName" value="activemq"/>
      <property name="portNumber" value="0"/>
    <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
      <property name="serverName" value="localhost"/>
      <property name="databaseName" value="activemq"/>
      <property name="portNumber" value="0"/>
    <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
      <property name="serverName" value="localhost"/>
      <property name="databaseName" value="activemq"/>
      <property name="portNumber" value="0"/>
    <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
      <property name="serverName" value="localhost"/>
      <property name="databaseName" value="activemq"/>
      <property name="portNumber" value="0"/>
      <property name="user" value="activemq"/>
      <property name="password" value="activemq"/>
      <property name="dataSourceName" value="postgres"/>
      <property name="initialConnections" value="1"/>
      <property name="maxConnections" value="10"/>
    </bean>
    -->

    <!-- MySql DataSource Sample Setup -->
    <!-- -->
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
      <property name="url" value="jdbc:mysql://core6/activemq?relaxAutoCommit=true"/>
      <property name="username" value="root"/>
      <property name="password" value=""/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
    </bean>
    

    <!-- Oracle DataSource Sample Setup -->
    <!--
    <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
      <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
      <property name="username" value="scott"/>
      <property name="password" value="tiger"/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
    </bean>
    -->

    <!-- Embedded Derby DataSource Sample Setup -->
    <!-- -->
    <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
      <property name="databaseName" value="derbydb"/>
      <property name="createDatabase" value="create"/>
    </bean>
    

</beans>
<!-- END SNIPPET: example -->
package amq.bench;

import java.io.File;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.security.MessageDigest;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * with md5 check message and a hashset validate if there are duplicated message
 * @author guolin.zhuanggl 2008-12-8 ÉÏÎç11:10:34
 */
public class DuplicatedMessageTest {

    static Log        logger      = LogFactory.getLog("output");
    String            uri         = "tcp://coreapp2:61616";
    String            queue       = "test";
    int               messageSize = 200;
    int               threads     = 50;
    int               messages    = 20000;
    String            shutdown;
    String            wait;
    String            type;

    File              shutdownFile;
    // File waitFile, sendFile, recieveFile;
    ConnectionFactory factory;
    String            text        = "Bye sun mq,we like activemq!";
    boolean           isShutting  = false;

    // statistics
    AtomicInteger     count       = new AtomicInteger(0);
    long              start       = 0;

    Reciever[]        recievers;

    public void setup() {
        logger.info("Start setup bench test .");
        uri = System.getProperty("uri", "tcp://core6:61616");
        threads = Integer.parseInt(System.getProperty("threads", "50"));
        messages = Integer.parseInt(System.getProperty("messages", "20000"));
        queue = System.getProperty("queue", "test");
        messageSize = Integer.parseInt(System.getProperty("messageSize", "200"));
        shutdown = System.getProperty("shutdown");
        wait = System.getProperty("wait");
        type = System.getProperty("type", "send");

        shutdownFile = new File(shutdown);
        // waitFile = new File(wait);
        // sendFile = new File(waitFile.getParent() + "/" + "bench.send." + messages + "." + messageSize);
        // recieveFile = new File(waitFile.getParent() + "/" + "bench.recieve");
        //
        // logger.info(waitFile + "," + sendFile + "," + recieveFile);
        logger.info("Init a bench.");
        factory = new ActiveMQConnectionFactory(uri);
        char[] tmp = new char[messageSize];
        char[] src = text.toCharArray();
        for (int i = 0; i < tmp.length / text.length(); i++) {
            System.arraycopy(src, 0, tmp, i * src.length, src.length);
        }
        text = String.copyValueOf(tmp);
        logger.info("uri = " + uri);
        logger.info("threads = " + threads);
        logger.info("messageSize = " + messageSize);
        logger.info("End setup bench test .");
    }

    public void start() {
        if (!shutdownFile.exists()) {
            try {
                shutdownFile.createNewFile();
                Thread.currentThread().sleep(1000);
                shutdownFile.delete();
            } catch (Exception e) {
                logger.error("Create shutdown file failed. ", e);
            }
        }
        if (type.equals("recieve")) {
            recievers = new Reciever[threads];
        }
        start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            if (type.equals("send")) {
                new Sender(i).start();
            } else if (type.equals("recieve")) {
                recievers[i] = new Reciever(i);
                recievers[i].start();
            }
        }
        new Watcher().start();
    }

    static AtomicInteger id = new AtomicInteger(0);

    Message createMessage(Session session) throws JMSException {
        MD5Message md5 = new MD5Message(id.incrementAndGet());
        ObjectMessage ret = session.createObjectMessage();
        ret.setObject(md5);
        return ret;
    }

    public static void main(String[] args) {
        DuplicatedMessageTest bench = new DuplicatedMessageTest();

        try {
            bench.setup();
        } catch (Exception e) {
            logger.error(e);
            System.exit(-1);
        }
        bench.start();
    }

    class Watcher extends Thread {

        public Watcher(){
        }

        public void run() {
            int c = -1, c0 = 0;

            while (true) {
                if (shutdownFile.exists()) {
                    isShutting = true;
                    logger.info("Task is shut,watch thread is shutting... ...");
                    return;
                }
                if (count.get() >= threads * messages) {
                    logger.info("Task finished,thread is shutting... ...");
                    if (recievers != null) {
                        for (Reciever rec : recievers) {
                            try {
                                rec.close();
                            } catch (Exception e) {
                                logger.error("reciever close failed. ", e);
                            }
                        }
                    }
                    return;
                }
                int load = 0;
                try {
                    // load = getServerLoad("");
                    sleep(10000);
                    if (c == threads * messages) {
                        logger.info("Task finished,watch thread is shutting... ...");
                    }
                } catch (Exception e) {
                    logger.error("watch tread interrupted!");
                }
                c0 = c;
                c = count.get();
                StringWriter str = new StringWriter();
                PrintWriter writer = new PrintWriter(str);
                long t = System.currentTimeMillis() - start;
                writer.printf("Statistics: %8d %8d %8d %8d %8d", t, count.get(), c - c0, (int) (c / (t / 1000f)),
                              messageSet.size());
                logger.info(str.toString());
                // System.out.println(str);
            }
        }
    }

    Set<String> messageSet = Collections.synchronizedSet(new HashSet<String>());

    class Reciever {

        String          name;
        Connection      connection;
        MessageConsumer consumer;
        Destination     destination;
        Session         session;

        // Message message;

        public Reciever(int id){
            try {
                name = "Bench reciever thread#" + id;
                connection = factory.createConnection();
                session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                destination = session.createQueue(queue);
                consumer = session.createConsumer(destination);
                // message = session.createTextMessage(text);
            } catch (JMSException e) {
                logger.error(name + " init failed. " + e.getMessage());
                throw new RuntimeException(e);
            }
        }

        public void close() {
            try {
                consumer.setMessageListener(null);
                consumer.close();
                session.close();
                connection.close();
            } catch (JMSException e) {
                logger.error(e);
            }
        }

        public void start() {
            try {
                connection.start();
                consumer.setMessageListener(new MessageListener() {

                    public void onMessage(Message message) {
                        try {
                            if (isShutting) {
                                // logger.info("Reciever is shutting... ...");
                                return;
                            }
                            if (count.get() >= threads * messages) {
                                // logger.info("Task finished,thread is shutting... ...");
                                return;
                            }
                            message.acknowledge();
                            count.incrementAndGet();
                            if (message instanceof ObjectMessage) {
                                MD5Message md = (MD5Message) ((ObjectMessage) message).getObject();
                                if (!md.validate()) {
                                    logger.error("###############Message is error. " + message
                                                 + "################################");
                                }
                                if (messageSet.contains(md.message)) {
                                    logger.error("---------------------Message is duplicated. message:" + md.message
                                                 + " id:" + message.getJMSMessageID() + "consumer: "
                                                 + consumer.hashCode() + "------------------------");
                                } else {
                                    messageSet.add(md.message);
                                }
                            }
                        } catch (JMSException e) {
                            logger.error(e);
                        }
                    }
                });
            } catch (JMSException e) {
                logger.error(e);
            }
        }
    }

    class Sender extends Thread {

        String          name;
        Connection      connection;
        MessageProducer producer;
        Destination     destination;
        Session         session;

        public Sender(int id){
            try {
                name = "Bench send thread#" + id;
                connection = factory.createConnection();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue(queue);
                producer = session.createProducer(destination);

            } catch (JMSException e) {
                logger.error(e);
                throw new RuntimeException(e);
            }
            setDaemon(true);
        }

        public void run() {
            try {
                connection.start();
                for (;;) {
                    if (isShutting) {
                        logger.info("Sender is shutting... ...");
                        return;
                    }
                    synchronized (count) {
                        if (count.get() >= threads * messages) {
                            logger.info("Task finished,thread is shutting... ...");
                            return;
                        }
                        try {
                            producer.send(createMessage(session));
                            count.incrementAndGet();
                        } catch (JMSException e) {
                            logger.error("Send message error" + e.getMessage());
                        }
                    }
                }
            } catch (JMSException e) {
                logger.error(e);
            } finally {
                try {
                    producer.close();
                    session.close();
                    connection.close();
                } catch (JMSException e) {
                    logger.error(e);
                }
            }

        }
    }
}

class MD5Message implements Serializable {

    public MD5Message(int id){

        this.message = "Message##" + id;
        try {
            MessageDigest md = MessageDigest.getInstance("MD5");
            byte[] data = md.digest(message.getBytes());
            md5 = data;
        } catch (Exception e) {
            DuplicatedMessageTest.logger.error("MD5 error. " + e);
        }
    }

    public boolean validate() {
        try {
            MessageDigest md = MessageDigest.getInstance("MD5");
            byte[] data = md.digest(message.getBytes());
            if (data.length != md5.length) {
                return false;
            }
            for (int i = 0; i < data.length; i++) {
                if (md5[i] != data[i]) {
                    return false;
                }
            }
        } catch (Exception e) {
            DuplicatedMessageTest.logger.error("MD5 error. " + e);
        }
        return true;
    }

    String message;
    byte[] md5;
}

Reply via email to