Claus,

I figured out why my custom Route Policy wasn't working for me.  

It seems that there is an issue when I use preMove on the File endpoint -
when the Exchange reaches my first Processor, it is no longer a
"GenericFile" class.

My processor does this on the first line:

GenericFile file = exchange.getIn().getBody(GenericFile.class);

This works fine with my route - but as soon as I use preMove and put the in
progress files into a "processing" directory.  The type of the body is no
longer GenericFile, but is now a java.io.File.  So of course, file is now
null.

This isn't an issue from my code, I can change the type and everything will
work.  But it seems very strange that it changes types when "preMove" is set
on the endpoint.

Is that expected behavior?

Jonathan


Claus Ibsen-2 wrote:
> 
> On Thu, Dec 31, 2009 at 5:39 PM, jonathanq <jqu...@abebooks.com> wrote:
>>
>> Excellent - that would definitely help my solution, as I could use lock
>> files
>> and if we had to kill the process, it would just delete those on next
>> start
>> up and re-process the files.
>>
>> So when is 2.2.0 coming out? :-)
>>
> I implemented this feature today so yeah it will be in 2.2
> 
> Early 2010. I hope we get it out in the start of February. The last
> major goal is to have an improved thread pool configuration.
> 
> 
>> The solution I have works - and will probably what we will use for this
>> application. But that will help with future applications as we do end up
>> writing a lot of file based camel processes.
>>
> 
> Yeah file is actually much harder than at first thought. Our goal is
> to make the file / ftp components in Camel flexible and to cover many
> of the use cases out there. So any feedback is valuable.
> 
> I will git it some though to see if we can change the dynamic inflight
> throttler to be measuring metrics a bit earlier.
> 
> Okay I guess its time to celebrate the new year.
> 
> 
> 
>> Thanks for all the help!
>>
>> Jonathan
>>
>>
>> Claus Ibsen-2 wrote:
>>>
>>> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jqu...@abebooks.com> wrote:
>>>>
>>>> I took a good look at the Route Policy - at first the
>>>> ThrottlingInflightRoutePolicy class seemed like it could work - as I
>>>> really
>>>> only want 5 exchanges to be in-flight at a time.
>>>>
>>>> Unfortunately it would never suspend the consumer.  I dug deeper into
>>>> the
>>>> code and discovered why.  The ThrottlingInflightRoutePolicy class only
>>>> checks the number of inflight exchanges AFTER an exchange has been
>>>> processed
>>>> (the code to stop or start a consumer is all done in the onExchangeDone
>>>> method).
>>>>
>>>> Since in my case the exchanges will take a while to process - it
>>>> wouldn't
>>>> know it had exceeded the maximum number until after it had finished
>>>> processing one of them.
>>>>
>>>> In my opinion that is a bug - or at the very least an important thing
>>>> to
>>>> note in the documentation.  I spent a fair bit of time trying to figure
>>>> out
>>>> why I could not get it to work as it appeared it was supposed to.  All
>>>> because it was not checking the inflight numbers to the threshold until
>>>> after it had finished processing an exchange.
>>>>
>>>> I also tried writing my own FileThrottlingRoutePolicy that would test
>>>> how
>>>> many files were in a "inprogress" directory - and stop the consumer if
>>>> it
>>>> exceeded the max concurrent files.
>>>>
>>>> However I ran into read/write issues when I used the preMove of files -
>>>> for
>>>> some reason my processes later would throw exceptions about file not
>>>> found
>>>> or file lock (I can't remember which - i have been trying so many
>>>> different
>>>> things today to try and get this working).
>>>>
>>>> In the end I solved my problem by avoiding my problem :)
>>>>
>>>> The primary reason I didn't want the file locks to occur is it would be
>>>> a
>>>> manual cleanup if we ever had to kill the process while it's running.
>>>> Otherwise the next time it started, it would ignore any of the files
>>>> that
>>>> had a lock file as well.
>>>>
>>>
>>> We have a ticket for that
>>> https://issues.apache.org/activemq/browse/CAMEL-2082
>>>
>>>
>>>
>>>> I re-wrote my route to work as follows:
>>>>
>>>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>>>>
>>>
>>> Nice solution :)
>>>
>>>> This way - when files are "finished" they will be placed in a
>>>> "processed"
>>>> directory, when they fail they are put in a "failed" directory.
>>>>  Anything
>>>> still in the incoming directory is to be processed.  Because the memory
>>>> of
>>>> what was processed and what hasn't been was all in memory - restarting
>>>> the
>>>> process will just re-start any of the files still in the incoming
>>>> directory.
>>>>
>>>> No more Lock files means restarting it won't cause us to have to delete
>>>> .lock files.
>>>>
>>>> I wish there was still an easier way to do what I wanted.  Now I just
>>>> have
>>>> to rely on the threads(5) to do the limiting to 5 files at a time.
>>>>  Although
>>>> if I understand your comment (and the documentation) I can't actually
>>>> rely
>>>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
>>>> depending on the system load?
>>>>
>>>> Jonathan
>>>>
>>>>
>>>>
>>>>
>>>> Claus Ibsen-2 wrote:
>>>>>
>>>>> Hi
>>>>>
>>>>> See also route policy to throttle the file consumer to a pace of 5
>>>>> concurrent files
>>>>> http://camel.apache.org/routepolicy.html
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez
>>>>> <gabriel.magn...@steria.com>
>>>>> wrote:
>>>>>>
>>>>>>
>>>>>> jonathanq wrote:
>>>>>>>
>>>>>>> I am trying to write a process that will use a file endpoint (camel
>>>>>>> 2.1.0)
>>>>>>> to read from a directory.
>>>>>>>
>>>>>>> I need the process to read a file from the directory and then do
>>>>>>> some
>>>>>>> processing on the contents (namely hitting a REST service for each
>>>>>>> record
>>>>>>> in the file).  We have been asked to limit the number of threads
>>>>>>> that
>>>>>>> are
>>>>>>> hitting the service to 5.  So we decided to simply process 5 files
>>>>>>> at
>>>>>>> a
>>>>>>> time (to avoid concurrency issues reading 1 file and writing to 1
>>>>>>> file
>>>>>>> with 5 threads)
>>>>>>>
>>>>>>> I tried a few different approaches, and I wanted to see if there was
>>>>>>> a
>>>>>>> way
>>>>>>> to do what I want.
>>>>>>>
>>>>>>> Approach 1:
>>>>>>>
>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>
>>>>>>> from("seda:filequeue").thread(5).process()
>>>>>>>
>>>>>>> Now - this reads in ALL of the files in the directory (places
>>>>>>> camelLock
>>>>>>> on
>>>>>>> all) and then sends them to the seda endpoint.  I saw log messages
>>>>>>> that
>>>>>>> referred to thread 1 through 6.  But from what I read on the
>>>>>>> documentation, thread() is not necessarily going t limit it at that
>>>>>>> number.
>>>>>>>
>>>>>
>>>>> thread(5) will limit to at most 5 concurrent threads from this point
>>>>> forward.
>>>>>
>>>>>
>>>>>>> Approach 2:
>>>>>>>
>>>>>>> from("file://incoming").thread(5).process()
>>>>>>>
>>>>>>> This only processed 5 files at a time - but created camelLocks on
>>>>>>> all
>>>>>>> files in the directory.
>>>>>>>
>>>>>>> So then I tried approach 3:
>>>>>>>
>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>
>>>>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>>>>
>>>>>>> Again this seems to work, however it puts a camelLock on all the
>>>>>>> files
>>>>>>> (because they were all processed by the first part of the route,
>>>>>>> they
>>>>>>> are
>>>>>>> just queued up in the second).
>>>>>>>
>>>>>>>
>>>>>>> While approach 3 works - what I would really like is to not have the
>>>>>>> camelLock placed on the files that are not being processed.
>>>>>>>
>>>>>>> So watching the directory, there would be (at most) 5 files with
>>>>>>> camelLock
>>>>>>> files created at a time, when they finish they are moved to the
>>>>>>> .camel
>>>>>>> directory, and then it starts processing the next file in the
>>>>>>> directory.
>>>>>>>
>>>>>
>>>>> You can also implement your own ProcessStrategy where you can deny
>>>>> consuming in more files than 5 at any given time.
>>>>> See the processStrategy option on the file consumer. Just return false
>>>>> on the begin() method.
>>>>>
>>>>> See
>>>>> http://camel.apache.org/file2.html
>>>>> in the bottom of the page.
>>>>>
>>>>>
>>>>>>> Is that possible?  Is there anything I should be sure to do in an
>>>>>>> error
>>>>>>> route so that I "roll back" the camel locks to ensure that
>>>>>>> unprocessed
>>>>>>> files are ready to process the next time the application starts?
>>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>>>>>> endpoint i.e.:
>>>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>>>>
>>>>>> Check the file component documentation :
>>>>>> http://camel.apache.org/file2.html
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Claus Ibsen
>>>>> Apache Camel Committer
>>>>>
>>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>>> Open Source Integration: http://fusesource.com
>>>>> Blog: http://davsclaus.blogspot.com/
>>>>> Twitter: http://twitter.com/davsclaus
>>>>>
>>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> Apache Camel Committer
>>>
>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>> Open Source Integration: http://fusesource.com
>>> Blog: http://davsclaus.blogspot.com/
>>> Twitter: http://twitter.com/davsclaus
>>>
>>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>
>>
> 
> 
> 
> -- 
> Claus Ibsen
> Apache Camel Committer
> 
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
> 
> 

-- 
View this message in context: 
http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26981331.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to