I went back to 5.1 and this behavior seems to work well by default with the KahaPersistenceAdapter. I assume it is using the Store Based Cursor. I didn't configure a PolicyEntry. Memory usage is capped at 64MB, which is the default cap. I will use 5.1 as my work around for now.
scot.hale wrote: > > Should this work in 5.3.0, or do we need to wait for 5.4.0 to be released? > > > > > > Gary Tully wrote: >> >> Ok. there is a problem and a workaround. see: >> https://issues.apache.org/activemq/browse/AMQ-2610 >> The test case[1] attached to the jira works as expected with the >> workaround. >> The tests is based on the code posted by Scot. >> >> As the file cursor and the queue share the same usage, (they are split >> for >> vmcursors), the key to ensuring the cursor limit kicks in before the >> queue >> memory limit (and producer flow controll) is to configure a cursor, >> policy.setCursorMemoryHighWaterMark(50); which is less than the 70% value >> used by the Queue. This ensures that it will spool messages to disk once >> 50% >> of the system usage is reached. >> >> Have a peek at the test case, [1] >> https://issues.apache.org/activemq/secure/attachment/18932/UnlimitedEnqueueTest.java >> >> >> On 16 February 2010 11:38, Gary Tully <gary.tu...@gmail.com> wrote: >> >>> There is something not right here. let me build a test case to >>> investigate >>> a bit. >>> >>> >>> On 16 February 2010 01:19, scot.hale <scot.h...@gmail.com> wrote: >>> >>>> >>>> A.) I tried using the FilePendingQueueMessageStoragePolicy. I assume >>>> that >>>> this needs to be added to the queue destination policy specifically. >>>> However I added it to default and Topic just to be sure (not shown >>>> here). >>>> >>>> I turned on flow control, but was unable to figure out what memory >>>> settings >>>> are needed. What I gathered from your post is that I need to set the >>>> queue >>>> destination memory limit higher than the default SystemUsage memory >>>> limit. >>>> Is that right? For example: >>>> >>>> >>>> >>>> >>>> brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024); >>>> >>>> brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024); >>>> >>>> PolicyMap policyMap = new PolicyMap(); >>>> >>>> List<PolicyEntry> entries = new ArrayList<PolicyEntry>(); >>>> >>>> PolicyEntry policy = new PolicyEntry(); >>>> policy.setProducerFlowControl(true); >>>> policy.setPendingQueuePolicy(new >>>> FilePendingQueueMessageStoragePolicy()); >>>> policy.setQueue(">"); >>>> policy.setMemoryLimit(64*1024*1024); >>>> entries.add(policy); >>>> policyMap.setPolicyEntries(entries); >>>> >>>> brokerService.setDestinationPolicy(policyMap); >>>> >>>> >>>> I tried it the other way around as well and it still stops (meaning >>>> producers are blocked or resourceAllocationExceptions are thrown from >>>> Queue.send()) when it gets to the lower of the two memory limits. I am >>>> definitely missing something. >>>> >>>> B.) Would using the StorePendingQueueMessageStoragePolicy provide the >>>> same >>>> behavior I am looking for? >>>> >>>> C.) I didn't understand the last sentence in your post. Does this mean >>>> that >>>> when the brokerService.getSystemUsage().getTempUsage() is the disk >>>> usage >>>> limit that should generate ResourceAllocationExceptions (assuming >>>> SendFailIfNoSpace is set to true)? In my configureation, it would mean >>>> when >>>> the 128MB is used up by the temp cursor references on disk, then no >>>> more >>>> resources are available? >>>> >>>> >>>> >>>> >>>> >>>> >>>> Gary Tully wrote: >>>> > >>>> > First thing is you need to use the FilePendingMessageStoragePolicy() >>>> as >>>> > that >>>> > will off load message references to the file system when >>>> > SystemUsage.MemoryUsage limit is reached. >>>> > >>>> > So 1) add the following to the broker policy entry >>>> > PendingQueueMessageStoragePolicy pendingQueuePolicy = new >>>> > FilePendingQueueMessageStoragePolicy(); >>>> > policy.setPendingQueuePolicy(pendingQueuePolicy); >>>> > >>>> > With flow control on, you need to configure a lower SystemUsage as >>>> the >>>> use >>>> > of disk space by the file based cursors is determined by the shared >>>> > SystemUsage.memoryLimit, which by default is the same value as the >>>> memory >>>> > limit for a destination. With a single destination, the flowcontroll >>>> kicks >>>> > in before the system usage so no spooling to disk occurs. >>>> > >>>> > 2) Configure a SystemUsage.MemoryLimit that is less than the default >>>> > destination memory limit of 64M >>>> > brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 * >>>> 1024 >>>> * >>>> > 63); >>>> > >>>> > This should do it once you add a TempStore() limit to implement 5. >>>> > >>>> > >>>> > On 15 February 2010 17:22, scot.hale <scot.h...@gmail.com> wrote: >>>> > >>>> >> >>>> >> I am trying to setup a queue with the following requirements: >>>> >> >>>> >> >>>> >> ActiveMQ 5.1 or 5.3 ( I have been testing with 5.3 ) >>>> >> 1. ) VM Transport >>>> >> 2. ) Persistent with KahaPersistenceAdaptor >>>> >> 4. ) JVM Memory usage is capped at something like 64MB >>>> >> - When this limit is reached the producers should continue to >>>> >> store >>>> >> incoming messages to disk (StoreBasedCursor or FileBasedCursor will >>>> >> work, >>>> >> since the former is the default that is the one I have been using.) >>>> >> 5. ) File System usage is capped at something like 10GB >>>> >> - When this limit is reached the producers should start >>>> throwing >>>> >> javax.jms.ResourceAllocationExceptions to the Producers >>>> >> >>>> >> Number 5 is the least important, as it will be difficult to fill up >>>> disk >>>> >> space in production. My current setup configures ActiveMQ >>>> >> programatically. >>>> >> I don't think this is introducing problems, let me know if there are >>>> >> issues >>>> >> with programatic configuration. >>>> >> >>>> >> >>>> >> Default settings: >>>> >> If I do not configure the SystemUsage or the Flow control, >>>> then >>>> >> 64MB >>>> >> default memory usage is reached and the producers are halted even >>>> though >>>> >> the >>>> >> queues are persistent and have much more space. Should the default >>>> >> StoreBasedCursor behave this way? >>>> >> >>>> >> >>>> >> Turn off Flow Control: >>>> >> When I turn off Flow Control with default SystemUseage >>>> settings, >>>> >> then the >>>> >> JVM memory is not capped. After about 5 million messages with no >>>> >> consumers >>>> >> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors. >>>> >> >>>> >> >>>> >> So what setting do I need to cap the memory and allow the messages >>>> to >>>> be >>>> >> stored to disk even when the cap is reached? >>>> >> >>>> >> >>>> >> >>>> >> This is how I programtically configure my BrokerService >>>> >> >>>> >> System.setProperty("defaultBinSize", "16384");//Only way to >>>> set >>>> >> HashIndex bin size for KahaPersistenceAdapter >>>> >> try { >>>> >> uri = new URI("vm://"+brokerName); >>>> >> } catch (URISyntaxException e) { >>>> >> throw new RuntimeException(e); >>>> >> } >>>> >> brokerService = new BrokerService(); >>>> >> brokerService.setBrokerName(brokerName); >>>> >> brokerService.setUseJmx(true); >>>> >> brokerService.setUseLoggingForShutdownErrors(true); >>>> >> >>>> >> >>>> >> PolicyMap policyMap = new PolicyMap(); >>>> >> List<PolicyEntry> entries = new ArrayList<PolicyEntry>(); >>>> >> PolicyEntry policy = new PolicyEntry(); >>>> >> policy.setProducerFlowControl(true); >>>> >> policy.setQueue(">"); >>>> >> entries.add(policy); >>>> >> policyMap.setPolicyEntries(entries); >>>> >> brokerService.setDestinationPolicy(policyMap); >>>> >> >>>> >> //PERSISTENCE >>>> >> brokerService.setPersistent(true); >>>> >> KahaPersistenceAdapter persistenceAdapter = new >>>> >> KahaPersistenceAdapter(); >>>> >> persistenceAdapter.setDirectory(new >>>> >> File("/tmp/activemq-"+brokerName+"/kaha")); >>>> >> brokerService.setDataDirectoryFile(new >>>> >> File("/tmp/activemq-"+brokerName+"/data")); >>>> >> brokerService.setTmpDataDirectory(new >>>> >> File("/tmp/activemq-"+brokerName+"/temp")); >>>> >> persistenceAdapter.setMaxDataFileLength(500L*1024*1024); >>>> >> >>>> >> try { >>>> >> brokerService.setPersistenceAdapter(persistenceAdapter); >>>> >> } catch (IOException e) { >>>> >> throw new RuntimeException(e); >>>> >> } >>>> >> try { >>>> >> >>>> brokerService.getSystemUsage().setSendFailIfNoSpace(true); >>>> >> brokerService.addConnector(uri); >>>> >> brokerService.start(); >>>> >> } catch (Exception e) { >>>> >> throw new RuntimeException(e); >>>> >> } >>>> >> >>>> >> >>>> >> >>>> >> Here is a Producer: >>>> >> >>>> >> public class Producer implements Runnable{ >>>> >> >>>> >> private BrokerService brokerService; >>>> >> private long numberOfMessages; >>>> >> >>>> >> public Producer(BrokerService brokerService, long n){ >>>> >> this.brokerService = brokerService; >>>> >> this.numberOfMessages = n; >>>> >> } >>>> >> >>>> >> public void run(){ >>>> >> ActiveMQConnectionFactory factory = new >>>> >> ActiveMQConnectionFactory(brokerService.getVmConnectorURI()); >>>> >> try { >>>> >> Connection conn = factory.createConnection(); >>>> >> conn.start(); >>>> >> for (int i = 0; i < numberOfMessages; i++) { >>>> >> Session session = conn.createSession(false, >>>> >> Session.AUTO_ACKNOWLEDGE); >>>> >> Destination destination = >>>> >> session.createQueue("test-queue"); >>>> >> MessageProducer producer = >>>> >> session.createProducer(destination); >>>> >> producer.setDeliveryMode(DeliveryMode.PERSISTENT); >>>> >> BytesMessage message = session.createBytesMessage(); >>>> >> message.writeBytes(new >>>> >> >>>> >> >>>> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,}); >>>> >> try { >>>> >> producer.send(message); >>>> >> } catch (ResourceAllocationException e) { >>>> >> e.printStackTrace(); >>>> >> } >>>> >> session.close(); >>>> >> } >>>> >> } catch (JMSException e) { >>>> >> throw new RuntimeException(e); >>>> >> } >>>> >> >>>> >> } >>>> >> } >>>> >> >>>> >> >>>> >> rajdavies wrote: >>>> >> > >>>> >> > Hi Scott, >>>> >> > >>>> >> > just change the below config to enable flow control - i.e: >>>> >> > >>>> >> > <policyEntry topic=">" producerFlowControl="true" >>>> memoryLimit="1mb"> >>>> >> > <policyEntry queue=">" producerFlowControl="true" >>>> memoryLimit="1mb"> >>>> >> > >>>> >> > in 5.3 - producerFlowControl is on by default - so just remove the >>>> >> > producerFlowControl entry from your configuration. >>>> >> > >>>> >> > If this all sounds double dutch - send in your config - and we'll >>>> help >>>> >> > with the correct settings :) >>>> >> > >>>> >> > >>>> >> > On 12 Feb 2010, at 20:49, scot.hale wrote: >>>> >> > >>>> >> >> >>>> >> >> Fred, >>>> >> >> >>>> >> >> Were you able to configure ActiveMQ to grow without surpassing >>>> the >>>> >> >> memory >>>> >> >> setting? I am trying to figure out how to do the same thing. >>>> >> >> >>>> >> >> -Scot >>>> >> >> >>>> >> >> >>>> >> >> Fred Moore-3 wrote: >>>> >> >>> >>>> >> >>> Hi, >>>> >> >>> >>>> >> >>> going back to Cursors and >>>> >> >>> >>>> >> >>>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html >>>> >> >>> ... >>>> >> >>> >>>> >> >>> ...can anyone shed some light on the actual role of memoryLimit >>>> in: >>>> >> >>> <policyEntry topic=">" producerFlowControl="false" >>>> >> >>> memoryLimit="1mb"> >>>> >> >>> <policyEntry queue=">" producerFlowControl="false" >>>> >> >>> memoryLimit="1mb"> >>>> >> >>> >>>> >> >>> ...moreover: *when* will producerFlowControl start slowing down >>>> >> >>> consumers? >>>> >> >>> >>>> >> >>> Cheers, >>>> >> >>> F. >>>> >> >>> >>>> >> >>> >>>> >> >> >>>> >> >> -- >>>> >> >> View this message in context: >>>> >> >> >>>> >> >>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html >>>> >> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>>> >> >> >>>> >> > >>>> >> > Rob Davies >>>> >> > http://twitter.com/rajdavies >>>> >> > I work here: http://fusesource.com >>>> >> > My Blog: http://rajdavies.blogspot.com/ >>>> >> > I'm writing this: http://www.manning.com/snyder/ >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> >>>> >> -- >>>> >> View this message in context: >>>> >> >>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html >>>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>>> >> >>>> >> >>>> > >>>> > >>>> > -- >>>> > http://blog.garytully.com >>>> > >>>> > Open Source Integration >>>> > http://fusesource.com >>>> > >>>> > >>>> >>>> -- >>>> View this message in context: >>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html >>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>>> >>>> >>> >>> >>> -- >>> http://blog.garytully.com >>> >>> Open Source Integration >>> http://fusesource.com >>> >> >> >> >> -- >> http://blog.garytully.com >> >> Open Source Integration >> http://fusesource.com >> >> > > -- View this message in context: http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27617794.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.