Just for the completeness, Here's the lastest version of my custom PollingConsumerStrategy
https://gist.github.com/anonymous/17465c5ff78d9b75726563dc7727d5ae It allows me to detect the specific error from Amazon and pass null down the route, or propagate the error in any other case. Is that a sane approach or is it a better way, cleaner, nicer, way more awesome way. Thanks Artur On Tue, Jul 4, 2017 at 5:14 PM, Artur Jablonski <ajablon...@ravenpack.com> wrote: > Right, > So I managed to get that working, though I am not sure if that's the > proper way of doing this or more of a hack. > > So again, I want to use pollEnrich() to fetch a specific file from S3. If > a file doesn't exist then I want to pass a null message down the route. To > simplify things let's say that for ANY error that happens during poll I > want to retry 3 times (with some backoff between) and then give up and pass > null. I DON"T want to use the timeout, as it seems brittle as I explained > in my previous email > > The problem with that is that it seems that by default if there's an error > while polling, pollEnrich() will try again and again indefinitely. Bummer. > > Taariq suggested using a custom pollStrategy to achieve correct handling > of that scenario. PollingConsumerPollStrategy has 3 methods, of which the > "boolean rollback(Consumer, Endpoint, int, Exception)" is of interest here > as it's the one called when en error happen during poll. The method is > supposed to return 'true' if you want to retry or 'false' otherwise. But > here's the confusing thing. The DefaultPollingConsumerPollStrategy that's > used if you don't define one always returns false and yet the polling > doesn't stop when error happens. > > Fine, so after digging into the code here's what I discovered: > > pollEnrich() in Java DSL will create PollEnricher in the route which among > other types, implements Processor. PollEnricher in its process() method > resolves a Consumer to which it will delegate using one of > receive()/receiveNoWait()/receive(timeout) methods depending whether a > timeout has been set or not. In my case I am not setting timeout so the > method it will delegate to is Consumer.receive(). > > At runtime that Consumer is an instance of EventDrivenPollingConsumer that > in turn delegates to the S3Consumer (which is an implementation of > ScheduledBatchPollingConsumer). The receive() method of the > EventDrivenPollingConsumer implementation will BLOCK until there's an > Exchange on it's internal queue. A place where an Exchange is put on that > queue is on the Consumer's process() method, because > EventDrivenPollingConsumer is itself a Processor. > > So here what happens if polling yields some result (omitting arguments of > the methods for brevity). > > PollEnricher.process() > EventDrivenPollingConsumer.start() > EventDrivenPollingConsumer.receive() > ScheduledPollConsumer,doStart() //this will effectively schedule the > polling on the S3Consumer in a separate thread > BlockingQueue.take() //this will block this thread until there's an > Exchange on the queue > > > So now the question is how does that polled exchange ending up on > EventDrivenPollingConsumer's queue so that it can unblock and carry on. > > ScheduledPollConsumer.run() //main entry to the polling task > S3Consumer.poll() > EventDrivenPollingConsumer.process() > BlockingQueue.put() //this effectively unblocks the > PollEnricher.process() and the processing can carry on > > > So there you have it, it's the EventDrivenPollingConsumer that is creating > the S3Consumer in this case and it passes itself as a Procesor to the > constructor, so S3Consumer can invoke the process() method. > > > Ok, so what happens in my usecase? it's the S3Consumer.poll() that throws > en exception from AWS service, so EventDrivenPollingConsumer.process() is > not called. The scheduled polling task will run over and over again. The > PollRnricher.process() blocked forever. > > So back to the PollingConsumerPollStrategy and it's rollback() method. > Like I said, even if I return false from it, the polling will continue, > because the polling task is never stopped. So what I ended up with was this: > > https://gist.github.com/anonymous/195f2b6c835db82decf1649969a717fe > > So basically after 3 attempts I am creating an Exchange with null Body and > run the process() method on it from the rollback() method so that the route > can continue, I return false and all looks good. > > Please note the dodgy Thread.sleep() which I put there for the backoff > between retires, because all the backoff* configurations from > ScheduledPollConsumer are working BETWEEN polls and not within the same > poll which is what is happening if I return 'true' from the rollback() > method. > > Is there a better way of doing this? This seems like a very common use > case to me, perhaps this should be considered as a configuration of > pollEnrich() or some other involved components here. > > What do I win for the longest email on the group? :p > > Best, > Artur > > On Thu, Jun 22, 2017 at 5:36 PM, Artur Jablonski <ajablon...@ravenpack.com > > wrote: > >> I will try that, thank you for the hint! >> >> On Wed, Jun 21, 2017 at 5:40 PM, Taariq Levack <taar...@gmail.com> wrote: >> >>> Then you can try the custom pollStrategy. >>> >>> >>> On 21 Jun 2017 10:57, "Artur Jablonski" <ajablon...@ravenpack.com> >>> wrote: >>> >>> > Hello! >>> > >>> > Thanks for the hint >>> > >>> > So that works... kind of. >>> > >>> > First, with timeout(0) it doesn't work, it won't even try to poll for >>> > anything. I suppose that's how it's supposed to work. Fine. >>> > >>> > With timeout(>0) it works, as in it gives up after the given number of >>> > milliseconds and null is propagated down the route where I can >>> > catch it in choice() and only process further those not null responses. >>> > >>> > This seems to me a bit brittle. I mean setting this timeout too low, >>> could >>> > potentially make me miss an existing file if some network glitch >>> happened >>> > that would slow down the request-response cycle, and setting it high >>> seems >>> > like a waste when the AWS-S3 Consumer throws an exception that clearly >>> > tells me that file that I want doesn't exist. >>> > >>> > So what I am looking for, I guess, is exception handling mechanism for >>> > Consumer, so that I can break polling on given exception. >>> > >>> > I looked here http://camel.apache.org/polling-consumer.html >>> > and was looking at backoffErrorThreshold, backoffIdleThreshold and >>> > backoffMultiplier as the only properties that mention error handling, >>> but I >>> > guess they're not for what I want. >>> > >>> > Am i missing something? >>> > >>> > Cheers >>> > Artur >>> > >>> > >>> > On Wed, Jun 21, 2017 at 6:26 AM, Taariq Levack <taar...@gmail.com> >>> wrote: >>> > >>> > > You can use a timeout of 0 for receiving with no wait, or greater >>> than 0 >>> > > for waiting a bit. >>> > > Default is -1 >>> > > >>> > > http://camel.apache.org/content-enricher.html >>> > > >>> > > Cheers, >>> > > Taariq >>> > > >>> > > On 20 Jun 2017 22:12, "Artur Jablonski" <ajablon...@ravenpack.com> >>> > wrote: >>> > > >>> > > > Right, that almost works. >>> > > > >>> > > > Now the problem I have is that when a file I ask for doesn't exist >>> the >>> > > > consumer seems to be polling for it indefinitely. Is there a way of >>> > > saying: >>> > > > hey polling consumer, try n times and then give up. Throw an error >>> or >>> > > > return null. Don't insist, please. >>> > > > >>> > > > Cheers >>> > > > Artur >>> > > > >>> > > > On 20 Jun 2017 9:55 a.m., "Artur Jablonski" < >>> ajablon...@ravenpack.com> >>> > > > wrote: >>> > > > >>> > > > > Thank you so much Gregor! >>> > > > > >>> > > > > You pushed me in the right direction. I initially wrote a patch >>> for >>> > S3 >>> > > > > producer to get me what I wanted, but after seeing your reply I >>> read >>> > > > about >>> > > > > the content enricher pattern and pollEnrich() in Camel. >>> > > > > >>> > > > > This SO was helpful as well >>> > > > > https://stackoverflow.com/questions/36948005/how-do- >>> > > > > dynamic-from-endpoints-and-exchanges-work-in-camel >>> > > > > >>> > > > > In the end, with this route: >>> > > > > >>> > > > > from("direct:pollEnrich").routeId("pollEnrich") >>> > > > > .pollEnrich() >>> > > > > .simple("aws-s3://{{buckets. >>> > > > mybucket}}?amazonS3Client=#amazonS3Client&deleteAfterRead=false& >>> > > > fileName=${in.header.CamelAwsS3Key}") >>> > > > > .convertBodyTo(byte[].class) >>> > > > > .to("bean:unmarshall") >>> > > > > >>> > > > > >>> > > > > I can now poke the route and set the CamelAWSS3Key header to the >>> > file I >>> > > > > want. >>> > > > > >>> > > > > Cheerio! >>> > > > > Artur >>> > > > > >>> > > > > >>> > > > > On Mon, Jun 19, 2017 at 10:43 PM, Gregor Zurowski < >>> > > > > gre...@list.zurowski.org> wrote: >>> > > > > >>> > > > >> Hi Artur, >>> > > > >> >>> > > > >> You should be able to get a single S3 object with the camel-aws >>> > > > >> component using the "fileName" query parameter on the >>> consumer. See >>> > > > >> the documentation at >>> > > > >> https://github.com/apache/camel/blob/master/components/camel >>> > > > >> -aws/src/main/docs/aws-s3-component.adoc. >>> > > > >> >>> > > > >> Gregor >>> > > > >> >>> > > > >> On Mon, Jun 19, 2017 at 5:28 PM, Artur Jablonski >>> > > > >> <ajablon...@ravenpack.com> wrote: >>> > > > >> > Hello, >>> > > > >> > >>> > > > >> > I am trying to get a single object from S3 via Camel and I am >>> not >>> > > sure >>> > > > >> how >>> > > > >> > to do this. >>> > > > >> > >>> > > > >> > It seems that the producer endpoint can only upload to S3 and >>> the >>> > > > >> consumer >>> > > > >> > endpoint can poll a bucket, but how can express a use case of >>> > > > >> retrieving a >>> > > > >> > single S3 object when I know it's key? >>> > > > >> > >>> > > > >> > Best >>> > > > >> > Artur >>> > > > >> >>> > > > > >>> > > > > >>> > > > >>> > > >>> > >>> >> >> >