Excellent - that would definitely help my solution, as I could use lock files
and if we had to kill the process, it would just delete those on next start
up and re-process the files.

So when is 2.2.0 coming out? :-)

The solution I have works - and will probably what we will use for this
application. But that will help with future applications as we do end up
writing a lot of file based camel processes.

Thanks for all the help!

Jonathan


Claus Ibsen-2 wrote:
> 
> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jqu...@abebooks.com> wrote:
>>
>> I took a good look at the Route Policy - at first the
>> ThrottlingInflightRoutePolicy class seemed like it could work - as I
>> really
>> only want 5 exchanges to be in-flight at a time.
>>
>> Unfortunately it would never suspend the consumer.  I dug deeper into the
>> code and discovered why.  The ThrottlingInflightRoutePolicy class only
>> checks the number of inflight exchanges AFTER an exchange has been
>> processed
>> (the code to stop or start a consumer is all done in the onExchangeDone
>> method).
>>
>> Since in my case the exchanges will take a while to process - it wouldn't
>> know it had exceeded the maximum number until after it had finished
>> processing one of them.
>>
>> In my opinion that is a bug - or at the very least an important thing to
>> note in the documentation.  I spent a fair bit of time trying to figure
>> out
>> why I could not get it to work as it appeared it was supposed to.  All
>> because it was not checking the inflight numbers to the threshold until
>> after it had finished processing an exchange.
>>
>> I also tried writing my own FileThrottlingRoutePolicy that would test how
>> many files were in a "inprogress" directory - and stop the consumer if it
>> exceeded the max concurrent files.
>>
>> However I ran into read/write issues when I used the preMove of files -
>> for
>> some reason my processes later would throw exceptions about file not
>> found
>> or file lock (I can't remember which - i have been trying so many
>> different
>> things today to try and get this working).
>>
>> In the end I solved my problem by avoiding my problem :)
>>
>> The primary reason I didn't want the file locks to occur is it would be a
>> manual cleanup if we ever had to kill the process while it's running.
>> Otherwise the next time it started, it would ignore any of the files that
>> had a lock file as well.
>>
> 
> We have a ticket for that
> https://issues.apache.org/activemq/browse/CAMEL-2082
> 
> 
> 
>> I re-wrote my route to work as follows:
>>
>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>>
> 
> Nice solution :)
> 
>> This way - when files are "finished" they will be placed in a "processed"
>> directory, when they fail they are put in a "failed" directory.  Anything
>> still in the incoming directory is to be processed.  Because the memory
>> of
>> what was processed and what hasn't been was all in memory - restarting
>> the
>> process will just re-start any of the files still in the incoming
>> directory.
>>
>> No more Lock files means restarting it won't cause us to have to delete
>> .lock files.
>>
>> I wish there was still an easier way to do what I wanted.  Now I just
>> have
>> to rely on the threads(5) to do the limiting to 5 files at a time.
>>  Although
>> if I understand your comment (and the documentation) I can't actually
>> rely
>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
>> depending on the system load?
>>
>> Jonathan
>>
>>
>>
>>
>> Claus Ibsen-2 wrote:
>>>
>>> Hi
>>>
>>> See also route policy to throttle the file consumer to a pace of 5
>>> concurrent files
>>> http://camel.apache.org/routepolicy.html
>>>
>>>
>>>
>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <gabriel.magn...@steria.com>
>>> wrote:
>>>>
>>>>
>>>> jonathanq wrote:
>>>>>
>>>>> I am trying to write a process that will use a file endpoint (camel
>>>>> 2.1.0)
>>>>> to read from a directory.
>>>>>
>>>>> I need the process to read a file from the directory and then do some
>>>>> processing on the contents (namely hitting a REST service for each
>>>>> record
>>>>> in the file).  We have been asked to limit the number of threads that
>>>>> are
>>>>> hitting the service to 5.  So we decided to simply process 5 files at
>>>>> a
>>>>> time (to avoid concurrency issues reading 1 file and writing to 1 file
>>>>> with 5 threads)
>>>>>
>>>>> I tried a few different approaches, and I wanted to see if there was a
>>>>> way
>>>>> to do what I want.
>>>>>
>>>>> Approach 1:
>>>>>
>>>>> from("file://incoming").to("seda:filequeue")
>>>>>
>>>>> from("seda:filequeue").thread(5).process()
>>>>>
>>>>> Now - this reads in ALL of the files in the directory (places
>>>>> camelLock
>>>>> on
>>>>> all) and then sends them to the seda endpoint.  I saw log messages
>>>>> that
>>>>> referred to thread 1 through 6.  But from what I read on the
>>>>> documentation, thread() is not necessarily going t limit it at that
>>>>> number.
>>>>>
>>>
>>> thread(5) will limit to at most 5 concurrent threads from this point
>>> forward.
>>>
>>>
>>>>> Approach 2:
>>>>>
>>>>> from("file://incoming").thread(5).process()
>>>>>
>>>>> This only processed 5 files at a time - but created camelLocks on all
>>>>> files in the directory.
>>>>>
>>>>> So then I tried approach 3:
>>>>>
>>>>> from("file://incoming").to("seda:filequeue")
>>>>>
>>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>>
>>>>> Again this seems to work, however it puts a camelLock on all the files
>>>>> (because they were all processed by the first part of the route, they
>>>>> are
>>>>> just queued up in the second).
>>>>>
>>>>>
>>>>> While approach 3 works - what I would really like is to not have the
>>>>> camelLock placed on the files that are not being processed.
>>>>>
>>>>> So watching the directory, there would be (at most) 5 files with
>>>>> camelLock
>>>>> files created at a time, when they finish they are moved to the .camel
>>>>> directory, and then it starts processing the next file in the
>>>>> directory.
>>>>>
>>>
>>> You can also implement your own ProcessStrategy where you can deny
>>> consuming in more files than 5 at any given time.
>>> See the processStrategy option on the file consumer. Just return false
>>> on the begin() method.
>>>
>>> See
>>> http://camel.apache.org/file2.html
>>> in the bottom of the page.
>>>
>>>
>>>>> Is that possible?  Is there anything I should be sure to do in an
>>>>> error
>>>>> route so that I "roll back" the camel locks to ensure that unprocessed
>>>>> files are ready to process the next time the application starts?
>>>>>
>>>>
>>>> Hi,
>>>>
>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>>>> endpoint i.e.:
>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>>
>>>> Check the file component documentation :
>>>> http://camel.apache.org/file2.html
>>>>
>>>> --
>>>> View this message in context:
>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> Apache Camel Committer
>>>
>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>> Open Source Integration: http://fusesource.com
>>> Blog: http://davsclaus.blogspot.com/
>>> Twitter: http://twitter.com/davsclaus
>>>
>>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>
>>
> 
> 
> 
> -- 
> Claus Ibsen
> Apache Camel Committer
> 
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
> 
> 

-- 
View this message in context: 
http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to