I went back to 5.1 and this behavior seems to work well by default with the
KahaPersistenceAdapter.  I assume it is using the Store Based Cursor.  I
didn't configure a PolicyEntry.  Memory usage is capped at 64MB, which is
the default cap.   I will use 5.1 as my work around for now.



scot.hale wrote:
> 
> Should this work in 5.3.0, or do we need to wait for 5.4.0 to be released?  
> 
> 
> 
> 
> 
> Gary Tully wrote:
>> 
>> Ok. there is a problem and a workaround. see:
>> https://issues.apache.org/activemq/browse/AMQ-2610
>> The test case[1] attached to the jira works as expected with the
>> workaround.
>> The tests is based on the code posted by Scot.
>> 
>> As the file cursor and the queue share the same usage, (they are split
>> for
>> vmcursors), the key to ensuring the cursor limit kicks in before the
>> queue
>> memory limit (and producer flow controll) is to configure a cursor,
>> policy.setCursorMemoryHighWaterMark(50); which is less than the 70% value
>> used by the Queue. This ensures that it will spool messages to disk once
>> 50%
>> of the system usage is reached.
>> 
>> Have a peek at the test case, [1]
>> https://issues.apache.org/activemq/secure/attachment/18932/UnlimitedEnqueueTest.java
>> 
>> 
>> On 16 February 2010 11:38, Gary Tully <gary.tu...@gmail.com> wrote:
>> 
>>> There is something not right here. let me build a test case to
>>> investigate
>>> a bit.
>>>
>>>
>>> On 16 February 2010 01:19, scot.hale <scot.h...@gmail.com> wrote:
>>>
>>>>
>>>> A.) I tried using the FilePendingQueueMessageStoragePolicy.  I assume
>>>> that
>>>> this needs to be added to the queue destination policy specifically.
>>>> However I added it to default and Topic just to be sure (not shown
>>>> here).
>>>>
>>>> I turned on flow control, but was unable to figure out what memory
>>>> settings
>>>> are needed.  What I gathered from your post is that I need to set the
>>>> queue
>>>> destination memory limit higher than the default SystemUsage memory
>>>> limit.
>>>> Is that right?  For example:
>>>>
>>>>
>>>>
>>>>
>>>> brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024);
>>>>
>>>> brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024);
>>>>
>>>>        PolicyMap policyMap = new PolicyMap();
>>>>
>>>>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>>>
>>>>        PolicyEntry policy = new PolicyEntry();
>>>>        policy.setProducerFlowControl(true);
>>>>         policy.setPendingQueuePolicy(new
>>>> FilePendingQueueMessageStoragePolicy());
>>>>        policy.setQueue(">");
>>>>        policy.setMemoryLimit(64*1024*1024);
>>>>         entries.add(policy);
>>>>        policyMap.setPolicyEntries(entries);
>>>>
>>>>        brokerService.setDestinationPolicy(policyMap);
>>>>
>>>>
>>>> I tried it the other way around as well and it still stops (meaning
>>>> producers are blocked or resourceAllocationExceptions are thrown from
>>>> Queue.send()) when it gets to the lower of the two memory limits. I am
>>>> definitely missing something.
>>>>
>>>> B.) Would using the StorePendingQueueMessageStoragePolicy provide the
>>>> same
>>>> behavior I am looking for?
>>>>
>>>> C.) I didn't understand the last sentence in your post.  Does this mean
>>>> that
>>>> when the brokerService.getSystemUsage().getTempUsage() is the disk
>>>> usage
>>>> limit that should generate ResourceAllocationExceptions (assuming
>>>> SendFailIfNoSpace is set to true)?  In my configureation, it would mean
>>>> when
>>>> the 128MB is used up by the temp cursor references on disk, then no
>>>> more
>>>> resources are available?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Gary Tully wrote:
>>>> >
>>>> > First thing is you need to use the FilePendingMessageStoragePolicy()
>>>> as
>>>> > that
>>>> > will off load message references to the file system when
>>>> > SystemUsage.MemoryUsage limit is reached.
>>>> >
>>>> > So 1) add the following to the broker policy entry
>>>> >         PendingQueueMessageStoragePolicy pendingQueuePolicy = new
>>>> > FilePendingQueueMessageStoragePolicy();
>>>> >         policy.setPendingQueuePolicy(pendingQueuePolicy);
>>>> >
>>>> > With flow control on, you need to configure a lower SystemUsage as
>>>> the
>>>> use
>>>> > of disk space by the file based cursors is determined by the shared
>>>> > SystemUsage.memoryLimit, which by default is the same value as the
>>>> memory
>>>> > limit for a destination. With a single destination, the flowcontroll
>>>> kicks
>>>> > in before the system usage so no spooling to disk occurs.
>>>> >
>>>> > 2) Configure a SystemUsage.MemoryLimit that is less than the default
>>>> > destination memory limit of 64M
>>>> >    brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 *
>>>> 1024
>>>> *
>>>> > 63);
>>>> >
>>>> > This should do it once you add a TempStore() limit to implement 5.
>>>> >
>>>> >
>>>> > On 15 February 2010 17:22, scot.hale <scot.h...@gmail.com> wrote:
>>>> >
>>>> >>
>>>> >> I am trying to setup a queue with the following requirements:
>>>> >>
>>>> >>
>>>> >> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
>>>> >> 1. ) VM Transport
>>>> >> 2. ) Persistent with KahaPersistenceAdaptor
>>>> >> 4. ) JVM Memory usage is capped at something like 64MB
>>>> >>        - When this limit is reached the producers should continue to
>>>> >> store
>>>> >> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will
>>>> >> work,
>>>> >> since the former is the default that is the one I have been using.)
>>>> >> 5. ) File System usage is capped at something like 10GB
>>>> >>        - When this limit is reached the producers should start
>>>> throwing
>>>> >> javax.jms.ResourceAllocationExceptions to the Producers
>>>> >>
>>>> >> Number 5 is the least important, as it will be difficult to fill up
>>>> disk
>>>> >> space in production. My current setup configures ActiveMQ
>>>> >> programatically.
>>>> >> I don't think this is introducing problems, let me know if there are
>>>> >> issues
>>>> >> with programatic configuration.
>>>> >>
>>>> >>
>>>> >> Default settings:
>>>> >>        If I do not configure the SystemUsage or the Flow control,
>>>> then
>>>> >> 64MB
>>>> >> default memory usage is reached and the producers are halted even
>>>> though
>>>> >> the
>>>> >> queues are persistent and have much more space.  Should the default
>>>> >> StoreBasedCursor behave this way?
>>>> >>
>>>> >>
>>>> >> Turn off Flow Control:
>>>> >>        When I turn off Flow Control with default SystemUseage
>>>> settings,
>>>> >> then the
>>>> >> JVM memory is not capped.  After about 5 million messages with no
>>>> >> consumers
>>>> >> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
>>>> >>
>>>> >>
>>>> >> So what setting do I need to cap the memory and allow the messages
>>>> to
>>>> be
>>>> >> stored to disk even when the cap is reached?
>>>> >>
>>>> >>
>>>> >>
>>>> >> This is how I programtically configure my BrokerService
>>>> >>
>>>> >>        System.setProperty("defaultBinSize", "16384");//Only way to
>>>> set
>>>> >> HashIndex bin size for KahaPersistenceAdapter
>>>> >>        try {
>>>> >>            uri = new URI("vm://"+brokerName);
>>>> >>        } catch (URISyntaxException e) {
>>>> >>            throw new RuntimeException(e);
>>>> >>        }
>>>> >>        brokerService = new BrokerService();
>>>> >>        brokerService.setBrokerName(brokerName);
>>>> >>        brokerService.setUseJmx(true);
>>>> >>        brokerService.setUseLoggingForShutdownErrors(true);
>>>> >>
>>>> >>
>>>> >>        PolicyMap policyMap = new PolicyMap();
>>>> >>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>>> >>        PolicyEntry policy = new PolicyEntry();
>>>> >>        policy.setProducerFlowControl(true);
>>>> >>        policy.setQueue(">");
>>>> >>        entries.add(policy);
>>>> >>        policyMap.setPolicyEntries(entries);
>>>> >>        brokerService.setDestinationPolicy(policyMap);
>>>> >>
>>>> >>        //PERSISTENCE
>>>> >>        brokerService.setPersistent(true);
>>>> >>        KahaPersistenceAdapter persistenceAdapter = new
>>>> >> KahaPersistenceAdapter();
>>>> >>        persistenceAdapter.setDirectory(new
>>>> >> File("/tmp/activemq-"+brokerName+"/kaha"));
>>>> >>        brokerService.setDataDirectoryFile(new
>>>> >> File("/tmp/activemq-"+brokerName+"/data"));
>>>> >>        brokerService.setTmpDataDirectory(new
>>>> >> File("/tmp/activemq-"+brokerName+"/temp"));
>>>> >>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
>>>> >>
>>>> >>        try {
>>>> >>            brokerService.setPersistenceAdapter(persistenceAdapter);
>>>> >>        } catch (IOException e) {
>>>> >>            throw new RuntimeException(e);
>>>> >>        }
>>>> >>        try {
>>>> >>           
>>>> brokerService.getSystemUsage().setSendFailIfNoSpace(true);
>>>> >>            brokerService.addConnector(uri);
>>>> >>            brokerService.start();
>>>> >>        } catch (Exception e) {
>>>> >>            throw new RuntimeException(e);
>>>> >>        }
>>>> >>
>>>> >>
>>>> >>
>>>> >> Here is a Producer:
>>>> >>
>>>> >> public class Producer implements Runnable{
>>>> >>
>>>> >>    private BrokerService brokerService;
>>>> >>    private long numberOfMessages;
>>>> >>
>>>> >>    public Producer(BrokerService brokerService, long n){
>>>> >>        this.brokerService = brokerService;
>>>> >>        this.numberOfMessages = n;
>>>> >>    }
>>>> >>
>>>> >>    public void run(){
>>>> >>        ActiveMQConnectionFactory factory = new
>>>> >> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
>>>> >>        try {
>>>> >>            Connection conn = factory.createConnection();
>>>> >>            conn.start();
>>>> >>            for (int i = 0; i < numberOfMessages; i++) {
>>>> >>                Session session = conn.createSession(false,
>>>> >> Session.AUTO_ACKNOWLEDGE);
>>>> >>                Destination destination =
>>>> >> session.createQueue("test-queue");
>>>> >>                MessageProducer producer =
>>>> >> session.createProducer(destination);
>>>> >>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>>>> >>                BytesMessage message = session.createBytesMessage();
>>>> >>                message.writeBytes(new
>>>> >>
>>>> >>
>>>> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
>>>> >>                try {
>>>> >>                    producer.send(message);
>>>> >>                } catch (ResourceAllocationException e) {
>>>> >>                    e.printStackTrace();
>>>> >>                }
>>>> >>                session.close();
>>>> >>            }
>>>> >>        } catch (JMSException e) {
>>>> >>            throw new RuntimeException(e);
>>>> >>         }
>>>> >>
>>>> >>    }
>>>> >> }
>>>> >>
>>>> >>
>>>> >> rajdavies wrote:
>>>> >> >
>>>> >> > Hi Scott,
>>>> >> >
>>>> >> > just change the below config to enable flow control - i.e:
>>>> >> >
>>>> >> > <policyEntry topic=">" producerFlowControl="true"
>>>> memoryLimit="1mb">
>>>> >> >   <policyEntry queue=">" producerFlowControl="true"
>>>> memoryLimit="1mb">
>>>> >> >
>>>> >> > in 5.3 - producerFlowControl is on by default - so just remove the
>>>> >> > producerFlowControl entry from your configuration.
>>>> >> >
>>>> >> > If this all sounds double dutch - send in your config - and we'll
>>>> help
>>>> >> > with the correct settings :)
>>>> >> >
>>>> >> >
>>>> >> > On 12 Feb 2010, at 20:49, scot.hale wrote:
>>>> >> >
>>>> >> >>
>>>> >> >> Fred,
>>>> >> >>
>>>> >> >> Were you able to configure ActiveMQ to grow without surpassing
>>>> the
>>>> >> >> memory
>>>> >> >> setting?  I am trying to figure out how to do the same thing.
>>>> >> >>
>>>> >> >> -Scot
>>>> >> >>
>>>> >> >>
>>>> >> >> Fred Moore-3 wrote:
>>>> >> >>>
>>>> >> >>> Hi,
>>>> >> >>>
>>>> >> >>> going back to Cursors and
>>>> >> >>>
>>>> >>
>>>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
>>>> >> >>> ...
>>>> >> >>>
>>>> >> >>> ...can anyone shed some light on the actual role of memoryLimit
>>>> in:
>>>> >> >>>   <policyEntry topic=">" producerFlowControl="false"
>>>> >> >>> memoryLimit="1mb">
>>>> >> >>>   <policyEntry queue=">" producerFlowControl="false"
>>>> >> >>> memoryLimit="1mb">
>>>> >> >>>
>>>> >> >>> ...moreover: *when* will producerFlowControl start slowing down
>>>> >> >>> consumers?
>>>> >> >>>
>>>> >> >>> Cheers,
>>>> >> >>> F.
>>>> >> >>>
>>>> >> >>>
>>>> >> >>
>>>> >> >> --
>>>> >> >> View this message in context:
>>>> >> >>
>>>> >>
>>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
>>>> >> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>> >> >>
>>>> >> >
>>>> >> > Rob Davies
>>>> >> > http://twitter.com/rajdavies
>>>> >> > I work here: http://fusesource.com
>>>> >> > My Blog: http://rajdavies.blogspot.com/
>>>> >> > I'm writing this: http://www.manning.com/snyder/
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >>
>>>> >> --
>>>> >> View this message in context:
>>>> >>
>>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
>>>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>> >>
>>>> >>
>>>> >
>>>> >
>>>> > --
>>>> > http://blog.garytully.com
>>>> >
>>>> > Open Source Integration
>>>> > http://fusesource.com
>>>> >
>>>> >
>>>>
>>>> --
>>>> View this message in context:
>>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html
>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>> --
>>> http://blog.garytully.com
>>>
>>> Open Source Integration
>>> http://fusesource.com
>>>
>> 
>> 
>> 
>> -- 
>> http://blog.garytully.com
>> 
>> Open Source Integration
>> http://fusesource.com
>> 
>> 
> 
> 

-- 
View this message in context: 
http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27617794.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to