At this point in your code: <from uri="seda:processAndStoreInQueue" / <unmarshal ref="myBeanio" />
Are you getting the entire file in memory? That's going to be big so maybe not what you want. It may be livable however. ActiveMQ can handle a lot of throughput but remember that if you are using persistent queues you necessarily lose several orders of magnitude in speed to persist and restore the data and you are both marshaling and unmarshaling the data. KahaDB and LevelDB are both journaling databases and are very fast but any time you go from nanosecond RAM speeds to milliscecond hard disk speeds you are going to slow down. And that's times two - down to the disk and back up into memory. Memory consumption is also going to go up. It still shouldn't take 6 hours to process. Something like this could be used to put individual lines from a file onto your queue and you can even specify it in blocks if you like. But I don't know the form of your data. <route> <from uri="file:inbox"/> <split streaming="true"> <tokenize token="\n" /> <to uri="activemq:queue:store"/> </split> </route> In that case it starts reading individual lines from the file, splits them on the lines, and drops each line onto the queue. Simultaneously you can start reading from that queue using the multithreaded consumer that you have. In this way you won't be reading the entire file into memory in one big chunk but will stream it in a line at a time or you can add the "group" to the tokenize line to specify how many lines should be read in at a time. I'd just take that and try it with your seda queue structure first to see if you can get it to work right for you. When you read off the queue you can then unmarshal it via Beanio and do transactions. At that point you could specify min/max threads for reading off the queue. This should also let the JVM reclaim memory as it goes along since the strings read in and off the queue, transformed and then processed are all ready for garbage collection. If the entire file is held in memory and being passed from queue to queue that won't get released. On Sat, Apr 16, 2016 at 8:04 AM, Michele <michele.mazzi...@finconsgroup.com> wrote: > Hi, > > as Ranx suggested, I tried with chain routes and attached screenshot of > memory usage seda-memory-usage.png > <http://camel.465427.n5.nabble.com/file/n5781206/seda-memory-usage.png> . > it is evident that there is a net improvement, on average 600 MB memory > usage. > > The chain of routes configured is: > > <route id="FileRetriever_Route"> > <from > > uri="{{uri.inbound}}?scheduler=quartz2&scheduler.cron={{poll.consumer.scheduler}}&scheduler.triggerId=FileRetriever&scheduler.triggerGroup=IF_CBIKIT{{uri.inbound.options}}" > /> > <to uri="seda:processAndStoreInQueue" /> > </route> > > <route id="Splitter_Route"> > <from uri="seda:processAndStoreInQueue" /> > > <unmarshal ref="myBeanio" /> > > <split streaming="true" executorServiceRef="threadPoolExecutor" > > <simple>${body}</simple> > <choice> > <when> > <simple></simple> > <setHeader > > headerName="CamelSplitIndex"><simple>${in.header.CamelSplitIndex}</simple></setHeader> > <to uri="seda:process.queue" /> > </when> > <otherwise> > <log message="Message discarded > ${in.header.CamelSplitIndex} - ${body}" > /> > </otherwise> > </choice> > </split> > </route> > > <route id="ProcessAndStoreInSedaQueue"> > <from uri="seda:process.queue?concurrentConsumers=10" /> > <process ref="BodyEnricherProcessor" /> > <to > > uri="dozer:transform?mappingFile=file:{{crt2.apps.home}}{{dozer.mapping.path}}&targetModel=com.fincons.ingenico.crt2.cbikit.inbound.model.SerialNumber" > /> > <marshal ref="Gson" /> > <log message="Store in SEDA Queue: ${in.header.CamelSplitIndex} - > ${body}" > /> > <to uri="seda:readyToSendRS.queue" /> > </route> > > <route id="ProcessMessageData_Route" errorHandlerRef="DLQErrorHandler" > > <from uri="seda:readyToSendRS.queue?concurrentConsumers=10" /> > <throttle timePeriodMillis="1000" asyncDelayed="true"> > <constant>3</constant> > > <enrich uri="direct:crm-login" strategyRef="OAuthStrategy" > /> > > <recipientList> > <header>CompleteActionPath</header> > </recipientList> > > <unmarshal ref="Gson" /> > > <choice> > <when> > <simple>${in.header.Result} == > 'ERROR'</simple> > <log message="CRM Response: ${body}" > loggingLevel="ERROR"/> > </when> > </choice> > </throttle> > </route> > > Logs: > Store in SEDA Queue: 36913 - > > {"data_caricamento_cbkit_c":"2016-04-05T08:17:51+02:00","data_import_cbikit_c":"2016-04-16T13:41:01+02:00","nome_file_cbikit_c":"20160405074559..csv","serialnumber_c":"12154WL60756738"} > and Throttling Policy in action: > INFO | d #46 - Throttle | CRMLogin_Route ... > > In addition, from my JUnit Test Camel is fastest (less than 1 minute) in > the > processing Read-Unmarshal-Split-Process-MockResult. > > So, when I repalced seda:readyToSendRS.queue with > activemq:queue:readyToSendRS, the memory usage triples. > > I can not use SEDA Component because in according to documentation: > this component does not implement any kind of persistence or recovery, if > the VM terminates while messages are yet to be processed. If you need > persistence, reliability or distributed SEDA, try using either JMS or > ActiveMQ. > > So, Can ActiveMQ manage a large number of messages in few seconds or > minutes? Or Do I need to defined a load balancer on Broker? > > Thanks a lot for your support. > > Best Regards > > Michele > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Best-Strategy-to-process-a-large-number-of-rows-in-File-tp5779856p5781206.html > Sent from the Camel - Users mailing list archive at Nabble.com. >