Hi , We find that ActiveMQ will produce a duplicate message under many comcumers . First We send 400,000 message to broker, then start comsumer thread to recieve message.If We have more comsumer Threads, more duplicate messages would appear. We recieve messages from a broker with 10 threads , duplicate message can easy produced.
The attachment are test java code and mq config file. Following are java version and os version 1. Java version java version "1.6.0_10-beta" Java(TM) SE Runtime Environment (build 1.6.0_10-beta-b14) Java HotSpot(TM) Server VM (build 11.0-b11, mixed mode) 2. Linux Linux core6 2.6.9-42.ELsmp #1 SMP Wed Jul 12 23:32:02 EDT 2006 x86_64 x86_64 x86_64 GNU/Linux And CPU is 2 X Quad-Core AMD Opteron(tm) Processor 2354 , total 8 cores Leon Liu
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- START SNIPPET: example --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:///${activemq.base}/conf/credentials.properties</value> </property> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data"> <!-- Destination specific policies using destination names or wildcards --> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" memoryLimit="100mb" producerFlowControl="false"/> </policyEntries> </policyMap> </destinationPolicy> <!-- Use the following to configure how ActiveMQ is exposed in JMX --> <managementContext> <managementContext createConnector="true"/> </managementContext> <!-- The store and forward broker networks ActiveMQ will listen to --> <networkConnectors> <networkConnector name="default-nc" uri="multicast://239.255.2.9"/> </networkConnectors> <persistenceAdapter> <amqPersistenceAdapter syncOnWrite="false" directory="${activemq.base}/data" indexBinSize="8192" cleanupInterval="300000" maxFileLength="32 mb" forceRecoverReferenceStore="false" recoverReferenceStore="false"/> </persistenceAdapter> <!-- Use the following if you wish to configure the journal with JDBC --> <!-- --> <!-- persistenceAdapter syncOnWrite="false" directory="${activemq.base}/data" maxFileLength="50 mb" forceRecoverReferenceStore="false" recoverReferenceStore="false"> <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/> </persistenceAdapter --> <!-- Or if you want to use pure JDBC without a journal --> <!-- <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#postgres-ds"/> </persistenceAdapter> --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="512 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="8 gb" name="foo"/> </storeUsage> <tempUsage> <tempUsage limit="256 mb"/> </tempUsage> </systemUsage> </systemUsage> <!-- The transport connectors ActiveMQ will listen to --> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616"/> <!-- transportConnector name="ssl" uri="ssl://localhost:61617"/> <transportConnector name="stomp" uri="stomp://localhost:61613"/> <transportConnector name="xmpp" uri="xmpp://localhost:61222"/ --> </transportConnectors> </broker> <!-- ** Lets deploy some Enterprise Integration Patterns inside the ActiveMQ Message Broker ** For more details see ** ** http://activemq.apache.org/enterprise-integration-patterns.html --> <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring"> <!-- You can use a <package> element for each root package to search for Java routes --> <package>org.foo.bar</package> <!-- You can use Spring XML syntax to define the routes here using the <route> element --> <route> <from uri="activemq:example.A"/> <to uri="activemq:example.B"/> </route> </camelContext> <!-- ** Lets configure some Camel endpoints ** ** http://activemq.apache.org/camel/components.html --> <!-- configure the camel activemq component to use the current broker --> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" > <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost?create=false&waitForStart=10000" /> <property name="userName" value="${activemq.username}"/> <property name="password" value="${activemq.password}"/> </bean> </property> </bean> <!-- Uncomment to create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic --> <!-- <commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://localhost" username="${activemq.username}" password="${activemq.password}"/> --> <!-- An embedded servlet engine for serving up the Admin console --> <jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> <connectors> <nioConnector port="8161"/> </connectors> <handlers> <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/> <webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/> <webAppContext contextPath="/fileserver" resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/> </handlers> </jetty> <!-- This xbean configuration file supports all the standard spring xml configuration options --> <!-- Postgres DataSource Sample Setup --> <!-- <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource"> <property name="serverName" value="localhost"/> <property name="databaseName" value="activemq"/> <property name="portNumber" value="0"/> <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource"> <property name="serverName" value="localhost"/> <property name="databaseName" value="activemq"/> <property name="portNumber" value="0"/> <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource"> <property name="serverName" value="localhost"/> <property name="databaseName" value="activemq"/> <property name="portNumber" value="0"/> <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource"> <property name="serverName" value="localhost"/> <property name="databaseName" value="activemq"/> <property name="portNumber" value="0"/> <property name="user" value="activemq"/> <property name="password" value="activemq"/> <property name="dataSourceName" value="postgres"/> <property name="initialConnections" value="1"/> <property name="maxConnections" value="10"/> </bean> --> <!-- MySql DataSource Sample Setup --> <!-- --> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://core6/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value=""/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> <!-- Oracle DataSource Sample Setup --> <!-- <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/> <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/> <property name="username" value="scott"/> <property name="password" value="tiger"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> --> <!-- Embedded Derby DataSource Sample Setup --> <!-- --> <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource"> <property name="databaseName" value="derbydb"/> <property name="createDatabase" value="create"/> </bean> </beans> <!-- END SNIPPET: example -->
package amq.bench; import java.io.File; import java.io.PrintWriter; import java.io.Serializable; import java.io.StringWriter; import java.security.MessageDigest; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * with md5 check message and a hashset validate if there are duplicated message * @author guolin.zhuanggl 2008-12-8 ÉÏÎç11:10:34 */ public class DuplicatedMessageTest { static Log logger = LogFactory.getLog("output"); String uri = "tcp://coreapp2:61616"; String queue = "test"; int messageSize = 200; int threads = 50; int messages = 20000; String shutdown; String wait; String type; File shutdownFile; // File waitFile, sendFile, recieveFile; ConnectionFactory factory; String text = "Bye sun mq,we like activemq!"; boolean isShutting = false; // statistics AtomicInteger count = new AtomicInteger(0); long start = 0; Reciever[] recievers; public void setup() { logger.info("Start setup bench test ."); uri = System.getProperty("uri", "tcp://core6:61616"); threads = Integer.parseInt(System.getProperty("threads", "50")); messages = Integer.parseInt(System.getProperty("messages", "20000")); queue = System.getProperty("queue", "test"); messageSize = Integer.parseInt(System.getProperty("messageSize", "200")); shutdown = System.getProperty("shutdown"); wait = System.getProperty("wait"); type = System.getProperty("type", "send"); shutdownFile = new File(shutdown); // waitFile = new File(wait); // sendFile = new File(waitFile.getParent() + "/" + "bench.send." + messages + "." + messageSize); // recieveFile = new File(waitFile.getParent() + "/" + "bench.recieve"); // // logger.info(waitFile + "," + sendFile + "," + recieveFile); logger.info("Init a bench."); factory = new ActiveMQConnectionFactory(uri); char[] tmp = new char[messageSize]; char[] src = text.toCharArray(); for (int i = 0; i < tmp.length / text.length(); i++) { System.arraycopy(src, 0, tmp, i * src.length, src.length); } text = String.copyValueOf(tmp); logger.info("uri = " + uri); logger.info("threads = " + threads); logger.info("messageSize = " + messageSize); logger.info("End setup bench test ."); } public void start() { if (!shutdownFile.exists()) { try { shutdownFile.createNewFile(); Thread.currentThread().sleep(1000); shutdownFile.delete(); } catch (Exception e) { logger.error("Create shutdown file failed. ", e); } } if (type.equals("recieve")) { recievers = new Reciever[threads]; } start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { if (type.equals("send")) { new Sender(i).start(); } else if (type.equals("recieve")) { recievers[i] = new Reciever(i); recievers[i].start(); } } new Watcher().start(); } static AtomicInteger id = new AtomicInteger(0); Message createMessage(Session session) throws JMSException { MD5Message md5 = new MD5Message(id.incrementAndGet()); ObjectMessage ret = session.createObjectMessage(); ret.setObject(md5); return ret; } public static void main(String[] args) { DuplicatedMessageTest bench = new DuplicatedMessageTest(); try { bench.setup(); } catch (Exception e) { logger.error(e); System.exit(-1); } bench.start(); } class Watcher extends Thread { public Watcher(){ } public void run() { int c = -1, c0 = 0; while (true) { if (shutdownFile.exists()) { isShutting = true; logger.info("Task is shut,watch thread is shutting... ..."); return; } if (count.get() >= threads * messages) { logger.info("Task finished,thread is shutting... ..."); if (recievers != null) { for (Reciever rec : recievers) { try { rec.close(); } catch (Exception e) { logger.error("reciever close failed. ", e); } } } return; } int load = 0; try { // load = getServerLoad(""); sleep(10000); if (c == threads * messages) { logger.info("Task finished,watch thread is shutting... ..."); } } catch (Exception e) { logger.error("watch tread interrupted!"); } c0 = c; c = count.get(); StringWriter str = new StringWriter(); PrintWriter writer = new PrintWriter(str); long t = System.currentTimeMillis() - start; writer.printf("Statistics: %8d %8d %8d %8d %8d", t, count.get(), c - c0, (int) (c / (t / 1000f)), messageSet.size()); logger.info(str.toString()); // System.out.println(str); } } } Set<String> messageSet = Collections.synchronizedSet(new HashSet<String>()); class Reciever { String name; Connection connection; MessageConsumer consumer; Destination destination; Session session; // Message message; public Reciever(int id){ try { name = "Bench reciever thread#" + id; connection = factory.createConnection(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue(queue); consumer = session.createConsumer(destination); // message = session.createTextMessage(text); } catch (JMSException e) { logger.error(name + " init failed. " + e.getMessage()); throw new RuntimeException(e); } } public void close() { try { consumer.setMessageListener(null); consumer.close(); session.close(); connection.close(); } catch (JMSException e) { logger.error(e); } } public void start() { try { connection.start(); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { if (isShutting) { // logger.info("Reciever is shutting... ..."); return; } if (count.get() >= threads * messages) { // logger.info("Task finished,thread is shutting... ..."); return; } message.acknowledge(); count.incrementAndGet(); if (message instanceof ObjectMessage) { MD5Message md = (MD5Message) ((ObjectMessage) message).getObject(); if (!md.validate()) { logger.error("###############Message is error. " + message + "################################"); } if (messageSet.contains(md.message)) { logger.error("---------------------Message is duplicated. message:" + md.message + " id:" + message.getJMSMessageID() + "consumer: " + consumer.hashCode() + "------------------------"); } else { messageSet.add(md.message); } } } catch (JMSException e) { logger.error(e); } } }); } catch (JMSException e) { logger.error(e); } } } class Sender extends Thread { String name; Connection connection; MessageProducer producer; Destination destination; Session session; public Sender(int id){ try { name = "Bench send thread#" + id; connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(queue); producer = session.createProducer(destination); } catch (JMSException e) { logger.error(e); throw new RuntimeException(e); } setDaemon(true); } public void run() { try { connection.start(); for (;;) { if (isShutting) { logger.info("Sender is shutting... ..."); return; } synchronized (count) { if (count.get() >= threads * messages) { logger.info("Task finished,thread is shutting... ..."); return; } try { producer.send(createMessage(session)); count.incrementAndGet(); } catch (JMSException e) { logger.error("Send message error" + e.getMessage()); } } } } catch (JMSException e) { logger.error(e); } finally { try { producer.close(); session.close(); connection.close(); } catch (JMSException e) { logger.error(e); } } } } } class MD5Message implements Serializable { public MD5Message(int id){ this.message = "Message##" + id; try { MessageDigest md = MessageDigest.getInstance("MD5"); byte[] data = md.digest(message.getBytes()); md5 = data; } catch (Exception e) { DuplicatedMessageTest.logger.error("MD5 error. " + e); } } public boolean validate() { try { MessageDigest md = MessageDigest.getInstance("MD5"); byte[] data = md.digest(message.getBytes()); if (data.length != md5.length) { return false; } for (int i = 0; i < data.length; i++) { if (md5[i] != data[i]) { return false; } } } catch (Exception e) { DuplicatedMessageTest.logger.error("MD5 error. " + e); } return true; } String message; byte[] md5; }