you would need durable subs to guarantee delivery, if the producers failover before all the consumers they will miss messages sent while they were missing.
On 28 January 2011 15:39, S Sharma <sspr...@gmail.com> wrote: > Bumping this request. This issue is hindering us using ActiveMQ in our > enterprise. > > Thanks so much! > > Sharma > > > On Wed, Jan 26, 2011 at 5:34 PM, S Sharma <sspr...@gmail.com> wrote: > >> Hi all, >> >> I am getting significant message drops in activemq-5.4.2 pure master-slave >> configuration. I am testing topics using the simple producer & asynchronous >> consumer from the activemq-cpp-library-3.2.4 (code attached). >> >> The master and slave configurations are also attached. I am testing >> (persistent) message drops by: >> >> 1. Create four producers from SimpleProducer.cpp, having line 135 as: >> string text = (string)("App1 Seq No: ") + ix_str; >> string text = (string)("App2 Seq No: ") + ix_str; >> string text = (string)("App3 Seq No: ") + ix_str; and >> string text = (string)("App4 Seq No: ") + ix_str; >> >> 2. Create four consumers (identical binaries) from SimpleAsyncConsumer.cpp >> all listening to topic "TEST.FOO". >> >> 3. Start all consumers: >> ./simple_async_consumer1 > consumer1, ./simple_async_consumer2 > consumer2, >> ./simple_async_consumer3 > consumer3, ./simple_async_consumer4 > consumer4 >> >> 4. Then start all producers: >> ./simple_producer1 > producer1 & ./simple_producer2 > producer2 & >> ./simple_producer3 > producer3 & ./simple_producer4 > producer4 & >> >> 5. Terminate master broker: kill -9 <MasterPID> >> >> In both slave configurations: shutdownOnMasterFailure="true" and >> shutdownOnMasterFailure="false" with manual copying kahadb, I am noticing >> that the four consumers do not receive several messages which are sent >> (likely during the failover process). For instance, attached consumer1.txt >> file has <40001 messages received. >> >> Can someone please help me decipher what is going wrong? If you need any >> clarification on the test case, please let me know. >> >> Master: >> >> <beans >> xmlns="http://www.springframework.org/schema/beans" >> xmlns:amq="http://activemq.apache.org/schema/core" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://www.springframework.org/schema/beans >> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd >> http://activemq.apache.org/schema/core >> http://activemq.apache.org/schema/core/activemq-core.xsd"> >> >> <broker >> xmlns="http://activemq.apache.org/schema/core" >> brokerName="master" >> dataDirectory="/home/activemq/activemq/data" >> waitForSlave="false" > >> <destinationPolicy> >> <policyMap> >> <policyEntries> >> <policyEntry topic=">" producerFlowControl="false" >> memoryLimit="1500mb"> >> <pendingSubscriberPolicy> >> <vmCursor /> >> </pendingSubscriberPolicy> >> </policyEntry> >> <policyEntry queue=">" producerFlowControl="false" >> memoryLimit="1500mb"> >> </policyEntry> >> </policyEntries> >> </policyMap> >> </destinationPolicy> >> >> >> <managementContext> >> <managementContext createConnector="false"/> >> </managementContext> >> >> >> <persistenceAdapter> >> <kahaDB directory="${activemq.base}/data/kahadb" >> cleanupInterval="300000" checkpointInterval="50000" >> journalMaxWriteBatchSize="6400k" >> journalMaxFileLength="100g" >> indexCacheSize="100000" indexWriteBatchSize="100000" >> /> >> </persistenceAdapter> >> >> <transportConnectors> >> <transportConnector name="openwire" uri="tcp:// >> 0.0.0.0:61616?wireFormat.maxInactivityDuration=0"/> >> </transportConnectors> >> >> </broker> >> >> <import resource="jetty.xml"/> >> >> </beans> >> >> >> Slave: >> >> <beans >> xmlns="http://www.springframework.org/schema/beans" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://www.springframework.org/schema/beans >> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd >> http://activemq.apache.org/schema/core >> http://activemq.apache.org/schema/core/activemq-core.xsd >> http://camel.apache.org/schema/spring >> http://camel.apache.org/schema/spring/camel-spring.xsd"> >> >> <bean >> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> >> >> <broker masterConnectorURI="tcp://masterIPAddress:61616" >> brokerName="slave" xmlns="http://activemq.apache.org/schema/core" >> shutdownOnMasterFailure="false" > >> >> <destinationPolicy> >> <policyMap> >> <policyEntries> >> <policyEntry queue=">" optimizedDispatch="true" /> >> <policyEntry topic=">" optimizedDispatch="true" /> >> </policyEntries> >> </policyMap> >> </destinationPolicy> >> >> <persistenceAdapter> >> <kahaDB directory="${activemq.base}/data/kahadb" >> cleanupInterval="300000" checkpointInterval="50000" >> journalMaxWriteBatchSize="6400k" >> journalMaxFileLength="100g" >> indexCacheSize="100000" indexWriteBatchSize="100000" >> /> >> </persistenceAdapter> >> <transportConnectors> >> <transportConnector uri="tcp://0.0.0.0:61616"/> >> </transportConnectors> >> </broker> >> </beans> >> >> > -- http://blog.garytully.com http://fusesource.com