I'm using ActiveMQ 5.5.1 in a pure master/slave configuration. Both master and slave have schedulerSupport="true" configured.
I submit a message to the master via the STOMP connector, for delayed delivery (in my case, 5000ms later). The message is accepted, and the scheduled delivery occurs. On the master, the subscribed consumer receives the message, processes it, ACKs it, and schedules another for 5000ms hence. The reason for this (rather than just using a cron scheduler) is that an incremented counter (non-monotonic) needs to be passed through the message. On the slave, the replicated queue shows an ever-growing number of messages which never get consumed. I'm concerned that either the slave will run out of memory, or that in the event of a master failure, the slave will have a backlog of potentially millions of messages which would be reprocessed. Presumably this isn't normal behaviour ... O halp? My master and slave configurations are below, with example code (in Ruby) to trigger the behaviour (to start it off, publish something to the "trigger" queue): == Begin MASTER config XML == <broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq01.a" dataDirectory="${activemq.base}/data" schedulerSupport="true" destroyApplicationContextOnStop="true"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true" memoryLimit="20mb"> <pendingSubscriberPolicy> <vmCursor /> </pendingSubscriberPolicy> </policyEntry> <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector="false"/> </managementContext> <networkConnectors> <networkConnector name="AtoBfrom01" uri="static:failover:(tcp://mq01.b:61616,tcp://mq02.b:61616)?maxReconnectAttempts=1&randomize=true" dynamicOnly="true" duplex="true" conduitSubscriptions="false"/> </networkConnectors> <persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="1 gb"/> </memoryUsage> <storeUsage> <storeUsage limit="0"/> </storeUsage> <tempUsage> <tempUsage limit="500 mb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61612?transport.closeAsync=false"/> <transportConnector name="stomp+nio" uri="stomp+nio://0.0.0.0:61613?transport.closeAsync=false"/> </transportConnectors> </broker> == End MASTER config XML == == Begin SLAVE config XML == <broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq02.a" masterConnectorURI="tcp://mq01.a:61616" dataDirectory="${activemq.base}/data" schedulerSupport="true" shutdownOnMasterFailure="false" destroyApplicationContextOnStop="true"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true" memoryLimit="20mb"> <pendingSubscriberPolicy> <vmCursor /> </pendingSubscriberPolicy> </policyEntry> <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector="false"/> </managementContext> <networkConnectors> <networkConnector name="AtoBfrom02" uri="static:failover:(tcp://mq01.b:61616,tcp://mq02.b:61616)?maxReconnectAttempts=1&randomize=true" dynamicOnly="true" duplex="true" conduitSubscriptions="false"/> </networkConnectors> <persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="1 gb"/> </memoryUsage> <storeUsage> <storeUsage limit="0"/> </storeUsage> <tempUsage> <tempUsage limit="500 mb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61612?transport.closeAsync=false"/> <transportConnector name="stomp+nio" uri="stomp+nio://0.0.0.0:61613?transport.closeAsync=false"/> </transportConnectors> </broker> == End SLAVE config XML == == BEGIN examble Ruby == #!/usr/bin/ruby require 'rubygems' require 'stomp' stomp_config = { :hosts => [ { :host => "mq01", :port => 61613, :ssl => false }, { :host => "mq02", :port => 61613, :ssl => false } ], :randomize => false } consumer = Stomp::Client.new(stomp_config) consumer.subscribe("/queue/trigger", :ack => "client") { |msg| if new_data = do_something(msg) txid = Time.now consumer.begin(txid) consumer.acknowledge(msg, :transaction => txid) consumer.publish("/queue/trigger", new_data, { :transaction => txid, :persistent => true, "AMQ_SCHEDULED_DELAY" => 5000 } ) consumer.commit(txid) else consumer.unreceive(msg) end } == END example Ruby== -- View this message in context: http://activemq.2283324.n4.nabble.com/Pure-master-slave-duplicates-delayed-messages-tp4212631p4212631.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.