you would need durable subs to guarantee delivery, if the producers
failover before all the consumers they will miss messages sent while
they were missing.

On 28 January 2011 15:39, S Sharma <sspr...@gmail.com> wrote:
> Bumping this request. This issue is hindering us using ActiveMQ in our
> enterprise.
>
> Thanks so much!
>
> Sharma
>
>
> On Wed, Jan 26, 2011 at 5:34 PM, S Sharma <sspr...@gmail.com> wrote:
>
>> Hi all,
>>
>> I am getting significant message drops in activemq-5.4.2 pure master-slave
>> configuration. I am testing topics using the simple producer & asynchronous
>> consumer from the activemq-cpp-library-3.2.4 (code attached).
>>
>> The master and slave configurations are also attached. I am testing
>> (persistent) message drops by:
>>
>> 1. Create four producers from SimpleProducer.cpp, having line 135 as:
>> string text = (string)("App1 Seq No: ") + ix_str;
>> string text = (string)("App2 Seq No: ") + ix_str;
>> string text = (string)("App3 Seq No: ") + ix_str; and
>> string text = (string)("App4 Seq No: ") + ix_str;
>>
>> 2. Create four consumers (identical binaries) from SimpleAsyncConsumer.cpp
>> all listening to topic "TEST.FOO".
>>
>> 3. Start all consumers:
>> ./simple_async_consumer1 > consumer1, ./simple_async_consumer2 > consumer2,
>> ./simple_async_consumer3 > consumer3, ./simple_async_consumer4 > consumer4
>>
>> 4. Then start all producers:
>> ./simple_producer1  > producer1 & ./simple_producer2 > producer2  &
>> ./simple_producer3 > producer3 & ./simple_producer4 > producer4 &
>>
>> 5. Terminate master broker: kill -9 <MasterPID>
>>
>> In both slave configurations: shutdownOnMasterFailure="true" and
>> shutdownOnMasterFailure="false" with manual copying kahadb, I am noticing
>> that the four consumers do not receive several messages which are sent
>> (likely during the failover process). For instance, attached consumer1.txt
>> file has <40001 messages received.
>>
>> Can someone please help me decipher what is going wrong? If you need any
>> clarification on the test case, please let me know.
>>
>> Master:
>>
>> <beans
>>   xmlns="http://www.springframework.org/schema/beans";
>>   xmlns:amq="http://activemq.apache.org/schema/core";
>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>   xsi:schemaLocation="http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>>   http://activemq.apache.org/schema/core
>> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>>
>>     <broker
>>         xmlns="http://activemq.apache.org/schema/core";
>>         brokerName="master"
>>         dataDirectory="/home/activemq/activemq/data"
>>         waitForSlave="false" >
>>         <destinationPolicy>
>>             <policyMap>
>>               <policyEntries>
>>                 <policyEntry topic=">" producerFlowControl="false"
>> memoryLimit="1500mb">
>>                   <pendingSubscriberPolicy>
>>                     <vmCursor />
>>                   </pendingSubscriberPolicy>
>>                 </policyEntry>
>>                 <policyEntry queue=">" producerFlowControl="false"
>> memoryLimit="1500mb">
>>                 </policyEntry>
>>               </policyEntries>
>>             </policyMap>
>>         </destinationPolicy>
>>
>>
>>         <managementContext>
>>             <managementContext createConnector="false"/>
>>         </managementContext>
>>
>>
>>         <persistenceAdapter>
>>             <kahaDB directory="${activemq.base}/data/kahadb"
>>                 cleanupInterval="300000" checkpointInterval="50000"
>>                 journalMaxWriteBatchSize="6400k"
>>                 journalMaxFileLength="100g"
>>                 indexCacheSize="100000" indexWriteBatchSize="100000"
>>              />
>>         </persistenceAdapter>
>>
>>         <transportConnectors>
>>             <transportConnector name="openwire" uri="tcp://
>> 0.0.0.0:61616?wireFormat.maxInactivityDuration=0"/>
>>         </transportConnectors>
>>
>>     </broker>
>>
>>     <import resource="jetty.xml"/>
>>
>> </beans>
>>
>>
>> Slave:
>>
>> <beans
>>   xmlns="http://www.springframework.org/schema/beans";
>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>   xsi:schemaLocation="http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>>   http://activemq.apache.org/schema/core
>> http://activemq.apache.org/schema/core/activemq-core.xsd
>>   http://camel.apache.org/schema/spring
>> http://camel.apache.org/schema/spring/camel-spring.xsd";>
>>
>>   <bean
>> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>>
>>   <broker masterConnectorURI="tcp://masterIPAddress:61616"
>> brokerName="slave" xmlns="http://activemq.apache.org/schema/core";
>> shutdownOnMasterFailure="false" >
>>
>>         <destinationPolicy>
>>             <policyMap>
>>                 <policyEntries>
>>                     <policyEntry queue=">" optimizedDispatch="true" />
>>                     <policyEntry topic=">" optimizedDispatch="true" />
>>                 </policyEntries>
>>             </policyMap>
>>         </destinationPolicy>
>>
>>         <persistenceAdapter>
>>             <kahaDB directory="${activemq.base}/data/kahadb"
>>                 cleanupInterval="300000" checkpointInterval="50000"
>>                 journalMaxWriteBatchSize="6400k"
>>                 journalMaxFileLength="100g"
>>                 indexCacheSize="100000" indexWriteBatchSize="100000"
>>              />
>>         </persistenceAdapter>
>>     <transportConnectors>
>>       <transportConnector uri="tcp://0.0.0.0:61616"/>
>>     </transportConnectors>
>>   </broker>
>> </beans>
>>
>>
>



-- 
http://blog.garytully.com
http://fusesource.com

Reply via email to