I'm using ActiveMQ 5.5.1 in a pure master/slave configuration. Both master
and slave have schedulerSupport="true" configured.

I submit a message to the master via the STOMP connector, for delayed
delivery (in my case, 5000ms later). The message is accepted, and the
scheduled delivery occurs. On the master, the subscribed consumer receives
the message, processes it, ACKs it, and schedules another for 5000ms hence.
The reason for this (rather than just using a cron scheduler) is that an
incremented counter (non-monotonic) needs to be passed through the message.

On the slave, the replicated queue shows an ever-growing number of messages
which never get consumed.

I'm concerned that either the slave will run out of memory, or that in the
event of a master failure, the slave will have a backlog of potentially
millions of messages which would be reprocessed.

Presumably this isn't normal behaviour ... O halp?

My master and slave configurations are below, with example code (in Ruby) to
trigger the behaviour (to start it off, publish something to the "trigger"
queue):

== Begin MASTER config XML ==
    <broker xmlns="http://activemq.apache.org/schema/core";
            brokerName="mq01.a"
            dataDirectory="${activemq.base}/data"
            schedulerSupport="true"
            destroyApplicationContextOnStop="true">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true"
memoryLimit="20mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true"
memoryLimit="20mb">
                  <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true"/>
                  </deadLetterStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <networkConnectors>
            <networkConnector name="AtoBfrom01" 
           
uri="static:failover:(tcp://mq01.b:61616,tcp://mq02.b:61616)?maxReconnectAttempts=1&amp;randomize=true"
            dynamicOnly="true"
            duplex="true"
            conduitSubscriptions="false"/>
        </networkConnectors>

        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
        </persistenceAdapter>

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="1 gb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="0"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="500 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
            <transportConnector name="stomp"
uri="stomp://0.0.0.0:61612?transport.closeAsync=false"/>
            <transportConnector name="stomp+nio"
uri="stomp+nio://0.0.0.0:61613?transport.closeAsync=false"/>
        </transportConnectors>

    </broker>
== End MASTER config XML ==

== Begin SLAVE config XML ==
    <broker xmlns="http://activemq.apache.org/schema/core";
            brokerName="mq02.a"
            masterConnectorURI="tcp://mq01.a:61616"
            dataDirectory="${activemq.base}/data"
            schedulerSupport="true"
            shutdownOnMasterFailure="false"
            destroyApplicationContextOnStop="true">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true"
memoryLimit="20mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true"
memoryLimit="20mb">
                  <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true"/>
                  </deadLetterStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <networkConnectors>
            <networkConnector name="AtoBfrom02" 
           
uri="static:failover:(tcp://mq01.b:61616,tcp://mq02.b:61616)?maxReconnectAttempts=1&amp;randomize=true"
            dynamicOnly="true"
            duplex="true"
            conduitSubscriptions="false"/>
        </networkConnectors>

        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
        </persistenceAdapter>

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="1 gb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="0"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="500 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
            <transportConnector name="stomp"
uri="stomp://0.0.0.0:61612?transport.closeAsync=false"/>
            <transportConnector name="stomp+nio"
uri="stomp+nio://0.0.0.0:61613?transport.closeAsync=false"/>
        </transportConnectors>

    </broker>
== End SLAVE config XML ==

== BEGIN examble Ruby ==
#!/usr/bin/ruby

require 'rubygems'
require 'stomp'

stomp_config = {
  :hosts => [
    { :host => "mq01", :port => 61613, :ssl => false },
    { :host => "mq02", :port => 61613, :ssl => false }
  ],
  :randomize => false
}

consumer = Stomp::Client.new(stomp_config)
consumer.subscribe("/queue/trigger", :ack => "client") { |msg|
  if new_data = do_something(msg)
    txid = Time.now  
    consumer.begin(txid)
    consumer.acknowledge(msg, :transaction => txid)
    consumer.publish("/queue/trigger", new_data, {
        :transaction => txid,
        :persistent => true,
        "AMQ_SCHEDULED_DELAY" => 5000
      }
    )
    consumer.commit(txid)
  else
    consumer.unreceive(msg)
  end
}
== END example Ruby==

--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Pure-master-slave-duplicates-delayed-messages-tp4212631p4212631.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to