Ok. there is a problem and a workaround. see:
https://issues.apache.org/activemq/browse/AMQ-2610
The test case[1] attached to the jira works as expected with the workaround.
The tests is based on the code posted by Scot.

As the file cursor and the queue share the same usage, (they are split for
vmcursors), the key to ensuring the cursor limit kicks in before the queue
memory limit (and producer flow controll) is to configure a cursor,
policy.setCursorMemoryHighWaterMark(50); which is less than the 70% value
used by the Queue. This ensures that it will spool messages to disk once 50%
of the system usage is reached.

Have a peek at the test case, [1]
https://issues.apache.org/activemq/secure/attachment/18932/UnlimitedEnqueueTest.java


On 16 February 2010 11:38, Gary Tully <gary.tu...@gmail.com> wrote:

> There is something not right here. let me build a test case to investigate
> a bit.
>
>
> On 16 February 2010 01:19, scot.hale <scot.h...@gmail.com> wrote:
>
>>
>> A.) I tried using the FilePendingQueueMessageStoragePolicy.  I assume that
>> this needs to be added to the queue destination policy specifically.
>> However I added it to default and Topic just to be sure (not shown here).
>>
>> I turned on flow control, but was unable to figure out what memory
>> settings
>> are needed.  What I gathered from your post is that I need to set the
>> queue
>> destination memory limit higher than the default SystemUsage memory limit.
>> Is that right?  For example:
>>
>>
>>
>>
>> brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024);
>>
>> brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024);
>>
>>        PolicyMap policyMap = new PolicyMap();
>>
>>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>
>>        PolicyEntry policy = new PolicyEntry();
>>        policy.setProducerFlowControl(true);
>>         policy.setPendingQueuePolicy(new
>> FilePendingQueueMessageStoragePolicy());
>>        policy.setQueue(">");
>>        policy.setMemoryLimit(64*1024*1024);
>>         entries.add(policy);
>>        policyMap.setPolicyEntries(entries);
>>
>>        brokerService.setDestinationPolicy(policyMap);
>>
>>
>> I tried it the other way around as well and it still stops (meaning
>> producers are blocked or resourceAllocationExceptions are thrown from
>> Queue.send()) when it gets to the lower of the two memory limits. I am
>> definitely missing something.
>>
>> B.) Would using the StorePendingQueueMessageStoragePolicy provide the same
>> behavior I am looking for?
>>
>> C.) I didn't understand the last sentence in your post.  Does this mean
>> that
>> when the brokerService.getSystemUsage().getTempUsage() is the disk usage
>> limit that should generate ResourceAllocationExceptions (assuming
>> SendFailIfNoSpace is set to true)?  In my configureation, it would mean
>> when
>> the 128MB is used up by the temp cursor references on disk, then no more
>> resources are available?
>>
>>
>>
>>
>>
>>
>> Gary Tully wrote:
>> >
>> > First thing is you need to use the FilePendingMessageStoragePolicy() as
>> > that
>> > will off load message references to the file system when
>> > SystemUsage.MemoryUsage limit is reached.
>> >
>> > So 1) add the following to the broker policy entry
>> >         PendingQueueMessageStoragePolicy pendingQueuePolicy = new
>> > FilePendingQueueMessageStoragePolicy();
>> >         policy.setPendingQueuePolicy(pendingQueuePolicy);
>> >
>> > With flow control on, you need to configure a lower SystemUsage as the
>> use
>> > of disk space by the file based cursors is determined by the shared
>> > SystemUsage.memoryLimit, which by default is the same value as the
>> memory
>> > limit for a destination. With a single destination, the flowcontroll
>> kicks
>> > in before the system usage so no spooling to disk occurs.
>> >
>> > 2) Configure a SystemUsage.MemoryLimit that is less than the default
>> > destination memory limit of 64M
>> >    brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024
>> *
>> > 63);
>> >
>> > This should do it once you add a TempStore() limit to implement 5.
>> >
>> >
>> > On 15 February 2010 17:22, scot.hale <scot.h...@gmail.com> wrote:
>> >
>> >>
>> >> I am trying to setup a queue with the following requirements:
>> >>
>> >>
>> >> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
>> >> 1. ) VM Transport
>> >> 2. ) Persistent with KahaPersistenceAdaptor
>> >> 4. ) JVM Memory usage is capped at something like 64MB
>> >>        - When this limit is reached the producers should continue to
>> >> store
>> >> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will
>> >> work,
>> >> since the former is the default that is the one I have been using.)
>> >> 5. ) File System usage is capped at something like 10GB
>> >>        - When this limit is reached the producers should start throwing
>> >> javax.jms.ResourceAllocationExceptions to the Producers
>> >>
>> >> Number 5 is the least important, as it will be difficult to fill up
>> disk
>> >> space in production. My current setup configures ActiveMQ
>> >> programatically.
>> >> I don't think this is introducing problems, let me know if there are
>> >> issues
>> >> with programatic configuration.
>> >>
>> >>
>> >> Default settings:
>> >>        If I do not configure the SystemUsage or the Flow control, then
>> >> 64MB
>> >> default memory usage is reached and the producers are halted even
>> though
>> >> the
>> >> queues are persistent and have much more space.  Should the default
>> >> StoreBasedCursor behave this way?
>> >>
>> >>
>> >> Turn off Flow Control:
>> >>        When I turn off Flow Control with default SystemUseage settings,
>> >> then the
>> >> JVM memory is not capped.  After about 5 million messages with no
>> >> consumers
>> >> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
>> >>
>> >>
>> >> So what setting do I need to cap the memory and allow the messages to
>> be
>> >> stored to disk even when the cap is reached?
>> >>
>> >>
>> >>
>> >> This is how I programtically configure my BrokerService
>> >>
>> >>        System.setProperty("defaultBinSize", "16384");//Only way to set
>> >> HashIndex bin size for KahaPersistenceAdapter
>> >>        try {
>> >>            uri = new URI("vm://"+brokerName);
>> >>        } catch (URISyntaxException e) {
>> >>            throw new RuntimeException(e);
>> >>        }
>> >>        brokerService = new BrokerService();
>> >>        brokerService.setBrokerName(brokerName);
>> >>        brokerService.setUseJmx(true);
>> >>        brokerService.setUseLoggingForShutdownErrors(true);
>> >>
>> >>
>> >>        PolicyMap policyMap = new PolicyMap();
>> >>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>> >>        PolicyEntry policy = new PolicyEntry();
>> >>        policy.setProducerFlowControl(true);
>> >>        policy.setQueue(">");
>> >>        entries.add(policy);
>> >>        policyMap.setPolicyEntries(entries);
>> >>        brokerService.setDestinationPolicy(policyMap);
>> >>
>> >>        //PERSISTENCE
>> >>        brokerService.setPersistent(true);
>> >>        KahaPersistenceAdapter persistenceAdapter = new
>> >> KahaPersistenceAdapter();
>> >>        persistenceAdapter.setDirectory(new
>> >> File("/tmp/activemq-"+brokerName+"/kaha"));
>> >>        brokerService.setDataDirectoryFile(new
>> >> File("/tmp/activemq-"+brokerName+"/data"));
>> >>        brokerService.setTmpDataDirectory(new
>> >> File("/tmp/activemq-"+brokerName+"/temp"));
>> >>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
>> >>
>> >>        try {
>> >>            brokerService.setPersistenceAdapter(persistenceAdapter);
>> >>        } catch (IOException e) {
>> >>            throw new RuntimeException(e);
>> >>        }
>> >>        try {
>> >>            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
>> >>            brokerService.addConnector(uri);
>> >>            brokerService.start();
>> >>        } catch (Exception e) {
>> >>            throw new RuntimeException(e);
>> >>        }
>> >>
>> >>
>> >>
>> >> Here is a Producer:
>> >>
>> >> public class Producer implements Runnable{
>> >>
>> >>    private BrokerService brokerService;
>> >>    private long numberOfMessages;
>> >>
>> >>    public Producer(BrokerService brokerService, long n){
>> >>        this.brokerService = brokerService;
>> >>        this.numberOfMessages = n;
>> >>    }
>> >>
>> >>    public void run(){
>> >>        ActiveMQConnectionFactory factory = new
>> >> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
>> >>        try {
>> >>            Connection conn = factory.createConnection();
>> >>            conn.start();
>> >>            for (int i = 0; i < numberOfMessages; i++) {
>> >>                Session session = conn.createSession(false,
>> >> Session.AUTO_ACKNOWLEDGE);
>> >>                Destination destination =
>> >> session.createQueue("test-queue");
>> >>                MessageProducer producer =
>> >> session.createProducer(destination);
>> >>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>> >>                BytesMessage message = session.createBytesMessage();
>> >>                message.writeBytes(new
>> >>
>> >>
>> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
>> >>                try {
>> >>                    producer.send(message);
>> >>                } catch (ResourceAllocationException e) {
>> >>                    e.printStackTrace();
>> >>                }
>> >>                session.close();
>> >>            }
>> >>        } catch (JMSException e) {
>> >>            throw new RuntimeException(e);
>> >>         }
>> >>
>> >>    }
>> >> }
>> >>
>> >>
>> >> rajdavies wrote:
>> >> >
>> >> > Hi Scott,
>> >> >
>> >> > just change the below config to enable flow control - i.e:
>> >> >
>> >> > <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
>> >> >   <policyEntry queue=">" producerFlowControl="true"
>> memoryLimit="1mb">
>> >> >
>> >> > in 5.3 - producerFlowControl is on by default - so just remove the
>> >> > producerFlowControl entry from your configuration.
>> >> >
>> >> > If this all sounds double dutch - send in your config - and we'll
>> help
>> >> > with the correct settings :)
>> >> >
>> >> >
>> >> > On 12 Feb 2010, at 20:49, scot.hale wrote:
>> >> >
>> >> >>
>> >> >> Fred,
>> >> >>
>> >> >> Were you able to configure ActiveMQ to grow without surpassing the
>> >> >> memory
>> >> >> setting?  I am trying to figure out how to do the same thing.
>> >> >>
>> >> >> -Scot
>> >> >>
>> >> >>
>> >> >> Fred Moore-3 wrote:
>> >> >>>
>> >> >>> Hi,
>> >> >>>
>> >> >>> going back to Cursors and
>> >> >>>
>> >>
>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
>> >> >>> ...
>> >> >>>
>> >> >>> ...can anyone shed some light on the actual role of memoryLimit in:
>> >> >>>   <policyEntry topic=">" producerFlowControl="false"
>> >> >>> memoryLimit="1mb">
>> >> >>>   <policyEntry queue=">" producerFlowControl="false"
>> >> >>> memoryLimit="1mb">
>> >> >>>
>> >> >>> ...moreover: *when* will producerFlowControl start slowing down
>> >> >>> consumers?
>> >> >>>
>> >> >>> Cheers,
>> >> >>> F.
>> >> >>>
>> >> >>>
>> >> >>
>> >> >> --
>> >> >> View this message in context:
>> >> >>
>> >>
>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
>> >> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >> >>
>> >> >
>> >> > Rob Davies
>> >> > http://twitter.com/rajdavies
>> >> > I work here: http://fusesource.com
>> >> > My Blog: http://rajdavies.blogspot.com/
>> >> > I'm writing this: http://www.manning.com/snyder/
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >>
>> >>
>> >
>> >
>> > --
>> > http://blog.garytully.com
>> >
>> > Open Source Integration
>> > http://fusesource.com
>> >
>> >
>>
>> --
>> View this message in context:
>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
>
>
> --
> http://blog.garytully.com
>
> Open Source Integration
> http://fusesource.com
>



-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Reply via email to