The easiest way to think about SEDA queues is to recognize why they were
created and I think that will help you decompose such problems. Staged
Event Driven Architecture.  Initially they were invented for auto
management of threads on queues where one stage that is getting too few
threads could steal some from another that has too many and the whole
system would degrade gracefully under high load.  But in our current
context you might think of it as an easy way to decouple processing stages
and assign threading.

Just decompose the problem into a  number of separate seda:queues and chain
the calls together with a log statement in each so you can make sure you
aren't accidentally reading an entire file in and inadvertently picking up
a huge lump of data.

<route id="stream.file.in">
     <from uri="file:/inbox"/>
     <unmarshal...."/>
<log message="${body}"/>
     <to uri="seda:process.queue"/>
</route>

<route id="process.records">
     <from uri="seda:process.queue?concurrentConsumers=10"/>
     <log message="${body}"/>
     //etc.
</route>

Make sure you are only seeing the rows come through.  Obviously I left out
a lot of the data streaming/unmarshalling logic there.  But a quick simple
route like that with the appropriate "streaming" and splitting as necessary
should result in that second stage getting on single lines or single
records (if they are multi-line).

On Fri, Apr 15, 2016 at 9:01 AM, Brad Johnson <brad.john...@mediadriver.com>
wrote:

> 6 hours?  I'm currently streaming a 50k line file through Beanio with seda
> queues in and doing similar processing in under a minute.
>
> You have a queue here tht appears to have something on  it that isbeing
> unmarashalled and then split.  How big is that object on the queue?  Is it
> only a single line of data? Try this.
>
> <route id="ProcessAndStoreInQueue_Route">
>                         <from
> uri="seda:processAndStoreInQueue?concurrentConsumers=30" />
>
>
>
> On Fri, Apr 15, 2016 at 8:25 AM, Jens Breitenstein <mailingl...@j-b-s.de>
> wrote:
>
>> Hi Michele
>>
>> Reading a CSV with 40k lines using camel in streaming takes a view
>> seconds. As you limit the queue-size to avoid OOM the entire performance
>> depends how fast you can empty the queue.
>> How long does processing of ONE message take in average? To me it looks
>> like approximately 1.6 secs (35000/6/60/60). The processes responsible for
>> reading the queue is single-threaded??
>>
>> Jens
>>
>>
>> Am 15/04/16 um 14:59 schrieb Michele:
>>
>> Hi,
>>>
>>> I spent a bit of time reading different topics on this issue, and I
>>> changed
>>> my route like this reducing the memory usage of about 300Mb:
>>>
>>> <route id="FileRetriever_Route">
>>>                         <from
>>>
>>> uri="{{uri.inbound}}?scheduler=quartz2&amp;scheduler.cron={{poll.consumer.scheduler}}&amp;scheduler.triggerId=FileRetriever&amp;scheduler.triggerGroup=IF_CBIKIT{{uri.inbound.options}}"
>>> />
>>>                         <setHeader
>>>
>>> headerName="ImportDateTime"><simple>${date:now:yyyyMMdd-HHmmss}</simple></setHeader>
>>>                         <setHeader
>>>
>>> headerName="MsgCorrelationId"><simple>CBIKIT_INBOUND_${in.header.ImportDateTime}</simple></setHeader>
>>>                         <setHeader headerName="breadcrumbId">
>>>
>>>
>>> <simple>Import-${in.header.CamelFileName}-${in.header.ImportDateTime}-${in.header.breadcrumbId}</simple>
>>>                 </setHeader>
>>>                         <to uri="seda:processAndStoreInQueue" />
>>>                         <log message="END - FileRetriever_Route" />
>>>                 </route>
>>>
>>>                 <route id="ProcessAndStoreInQueue_Route">
>>>                         <from uri="seda:processAndStoreInQueue" />
>>>                         <unmarshal>
>>>                                 <bindy type="Csv"
>>> classType="com.fincons.ingenico.crt2.cbikit.inbound.model.RowData"/>
>>>                         </unmarshal>
>>>
>>>                         <split streaming="true"
>>> executorServiceRef="myThreadPoolExecutor" >
>>>                                 <simple>${body}</simple>
>>>                                 <choice>
>>>                                         <when>
>>>                                                 <simple></simple>
>>>                                                 <setHeader
>>>
>>> headerName="CamelSplitIndex"><simple>${in.header.CamelSplitIndex}</simple></setHeader>
>>>                                                 <process
>>> ref="BodyEnricherProcessor" />
>>>                                                 <to
>>>
>>> uri="dozer:transform?mappingFile=file:{{crt2.apps.home}}{{dozer.mapping.path}}&amp;targetModel=com.fincons.ingenico.crt2.cbikit.inbound.model.SerialNumber"
>>> />
>>>                                                 <marshal ref="Gson" />
>>>                                                 <to
>>> uri="activemq:queue:CBIKIT"  />
>>>                                         </when>
>>>                                         <otherwise>
>>>                                                 <log message="Message
>>> discarded ${in.header.CamelSplitIndex} -
>>> ${body}" />
>>>                                         </otherwise>
>>>                                 </choice>
>>>                         </split>
>>>                 </route>
>>>
>>> The last test processed 35000 lines of CSV file in about 6h with an
>>> average
>>> memory usage 1400Mb successful. But, Can I improve further processing
>>> performance?
>>>
>>> In addition, I noticed that Queue Size of Queue is low. Why? (Producer is
>>> slower than Consumer?)
>>>
>>> Thanks in advance.
>>>
>>> Best Regards
>>>
>>> Michele
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://camel.465427.n5.nabble.com/Best-Strategy-to-process-a-large-number-of-rows-in-File-tp5779856p5781168.html
>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>
>>
>>
>

Reply via email to