Excellent - that would definitely help my solution, as I could use lock files and if we had to kill the process, it would just delete those on next start up and re-process the files.
So when is 2.2.0 coming out? :-) The solution I have works - and will probably what we will use for this application. But that will help with future applications as we do end up writing a lot of file based camel processes. Thanks for all the help! Jonathan Claus Ibsen-2 wrote: > > On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jqu...@abebooks.com> wrote: >> >> I took a good look at the Route Policy - at first the >> ThrottlingInflightRoutePolicy class seemed like it could work - as I >> really >> only want 5 exchanges to be in-flight at a time. >> >> Unfortunately it would never suspend the consumer. I dug deeper into the >> code and discovered why. The ThrottlingInflightRoutePolicy class only >> checks the number of inflight exchanges AFTER an exchange has been >> processed >> (the code to stop or start a consumer is all done in the onExchangeDone >> method). >> >> Since in my case the exchanges will take a while to process - it wouldn't >> know it had exceeded the maximum number until after it had finished >> processing one of them. >> >> In my opinion that is a bug - or at the very least an important thing to >> note in the documentation. I spent a fair bit of time trying to figure >> out >> why I could not get it to work as it appeared it was supposed to. All >> because it was not checking the inflight numbers to the threshold until >> after it had finished processing an exchange. >> >> I also tried writing my own FileThrottlingRoutePolicy that would test how >> many files were in a "inprogress" directory - and stop the consumer if it >> exceeded the max concurrent files. >> >> However I ran into read/write issues when I used the preMove of files - >> for >> some reason my processes later would throw exceptions about file not >> found >> or file lock (I can't remember which - i have been trying so many >> different >> things today to try and get this working). >> >> In the end I solved my problem by avoiding my problem :) >> >> The primary reason I didn't want the file locks to occur is it would be a >> manual cleanup if we ever had to kill the process while it's running. >> Otherwise the next time it started, it would ignore any of the files that >> had a lock file as well. >> > > We have a ticket for that > https://issues.apache.org/activemq/browse/CAMEL-2082 > > > >> I re-wrote my route to work as follows: >> >> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process() >> > > Nice solution :) > >> This way - when files are "finished" they will be placed in a "processed" >> directory, when they fail they are put in a "failed" directory. Anything >> still in the incoming directory is to be processed. Because the memory >> of >> what was processed and what hasn't been was all in memory - restarting >> the >> process will just re-start any of the files still in the incoming >> directory. >> >> No more Lock files means restarting it won't cause us to have to delete >> .lock files. >> >> I wish there was still an easier way to do what I wanted. Now I just >> have >> to rely on the threads(5) to do the limiting to 5 files at a time. >> Although >> if I understand your comment (and the documentation) I can't actually >> rely >> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads >> depending on the system load? >> >> Jonathan >> >> >> >> >> Claus Ibsen-2 wrote: >>> >>> Hi >>> >>> See also route policy to throttle the file consumer to a pace of 5 >>> concurrent files >>> http://camel.apache.org/routepolicy.html >>> >>> >>> >>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <gabriel.magn...@steria.com> >>> wrote: >>>> >>>> >>>> jonathanq wrote: >>>>> >>>>> I am trying to write a process that will use a file endpoint (camel >>>>> 2.1.0) >>>>> to read from a directory. >>>>> >>>>> I need the process to read a file from the directory and then do some >>>>> processing on the contents (namely hitting a REST service for each >>>>> record >>>>> in the file). We have been asked to limit the number of threads that >>>>> are >>>>> hitting the service to 5. So we decided to simply process 5 files at >>>>> a >>>>> time (to avoid concurrency issues reading 1 file and writing to 1 file >>>>> with 5 threads) >>>>> >>>>> I tried a few different approaches, and I wanted to see if there was a >>>>> way >>>>> to do what I want. >>>>> >>>>> Approach 1: >>>>> >>>>> from("file://incoming").to("seda:filequeue") >>>>> >>>>> from("seda:filequeue").thread(5).process() >>>>> >>>>> Now - this reads in ALL of the files in the directory (places >>>>> camelLock >>>>> on >>>>> all) and then sends them to the seda endpoint. I saw log messages >>>>> that >>>>> referred to thread 1 through 6. But from what I read on the >>>>> documentation, thread() is not necessarily going t limit it at that >>>>> number. >>>>> >>> >>> thread(5) will limit to at most 5 concurrent threads from this point >>> forward. >>> >>> >>>>> Approach 2: >>>>> >>>>> from("file://incoming").thread(5).process() >>>>> >>>>> This only processed 5 files at a time - but created camelLocks on all >>>>> files in the directory. >>>>> >>>>> So then I tried approach 3: >>>>> >>>>> from("file://incoming").to("seda:filequeue") >>>>> >>>>> from("seda:filequeue?concurrentConsumers=5").process() >>>>> >>>>> Again this seems to work, however it puts a camelLock on all the files >>>>> (because they were all processed by the first part of the route, they >>>>> are >>>>> just queued up in the second). >>>>> >>>>> >>>>> While approach 3 works - what I would really like is to not have the >>>>> camelLock placed on the files that are not being processed. >>>>> >>>>> So watching the directory, there would be (at most) 5 files with >>>>> camelLock >>>>> files created at a time, when they finish they are moved to the .camel >>>>> directory, and then it starts processing the next file in the >>>>> directory. >>>>> >>> >>> You can also implement your own ProcessStrategy where you can deny >>> consuming in more files than 5 at any given time. >>> See the processStrategy option on the file consumer. Just return false >>> on the begin() method. >>> >>> See >>> http://camel.apache.org/file2.html >>> in the bottom of the page. >>> >>> >>>>> Is that possible? Is there anything I should be sure to do in an >>>>> error >>>>> route so that I "roll back" the camel locks to ensure that unprocessed >>>>> files are ready to process the next time the application starts? >>>>> >>>> >>>> Hi, >>>> >>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file >>>> endpoint i.e.: >>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process() >>>> >>>> Check the file component documentation : >>>> http://camel.apache.org/file2.html >>>> >>>> -- >>>> View this message in context: >>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html >>>> Sent from the Camel - Users mailing list archive at Nabble.com. >>>> >>>> >>> >>> >>> >>> -- >>> Claus Ibsen >>> Apache Camel Committer >>> >>> Author of Camel in Action: http://www.manning.com/ibsen/ >>> Open Source Integration: http://fusesource.com >>> Blog: http://davsclaus.blogspot.com/ >>> Twitter: http://twitter.com/davsclaus >>> >>> >> >> -- >> View this message in context: >> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html >> Sent from the Camel - Users mailing list archive at Nabble.com. >> >> > > > > -- > Claus Ibsen > Apache Camel Committer > > Author of Camel in Action: http://www.manning.com/ibsen/ > Open Source Integration: http://fusesource.com > Blog: http://davsclaus.blogspot.com/ > Twitter: http://twitter.com/davsclaus > > -- View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html Sent from the Camel - Users mailing list archive at Nabble.com.