Just for the completeness,

Here's the lastest version of my custom PollingConsumerStrategy

https://gist.github.com/anonymous/17465c5ff78d9b75726563dc7727d5ae

It allows me to detect the specific error from Amazon and pass null down
the route, or propagate the error in any other case.

Is that a sane approach or is it a better way, cleaner, nicer, way more
awesome way.

Thanks
Artur



On Tue, Jul 4, 2017 at 5:14 PM, Artur Jablonski <ajablon...@ravenpack.com>
wrote:

> Right,
> So I managed to get that working, though I am not sure if that's the
> proper way of doing this or more of a hack.
>
> So again, I want to use pollEnrich() to fetch a specific file from S3. If
> a file doesn't exist then I want to pass a null message down the route. To
> simplify things let's say that for ANY error that happens during poll I
> want to retry 3 times (with some backoff between) and then give up and pass
> null. I DON"T want to use the timeout, as it seems brittle as I explained
> in my previous email
>
> The problem with that is that it seems that by default if there's an error
> while polling, pollEnrich() will try again and again indefinitely. Bummer.
>
> Taariq suggested using a custom pollStrategy to achieve correct handling
> of that scenario. PollingConsumerPollStrategy has 3 methods, of which the
> "boolean rollback(Consumer, Endpoint, int, Exception)" is of interest here
> as it's the one called when en error happen during poll. The method is
> supposed to return 'true' if you want to retry or 'false' otherwise. But
> here's the confusing thing. The DefaultPollingConsumerPollStrategy that's
> used if you don't define one always returns false and yet the polling
> doesn't stop when error happens.
>
> Fine, so after digging into the code here's what I discovered:
>
> pollEnrich() in Java DSL will create PollEnricher in the route which among
> other types, implements Processor. PollEnricher in its process() method
> resolves a Consumer to which it will delegate using one of
> receive()/receiveNoWait()/receive(timeout) methods depending whether a
> timeout has been set or not. In my case I am not setting timeout so the
> method it will delegate to is Consumer.receive().
>
> At runtime that Consumer is an instance of EventDrivenPollingConsumer that
> in turn delegates to the S3Consumer (which is an implementation of
> ScheduledBatchPollingConsumer). The receive() method of the
> EventDrivenPollingConsumer implementation will BLOCK until there's an
> Exchange on it's internal queue. A place where an Exchange is put on that
> queue is on the Consumer's process() method, because
> EventDrivenPollingConsumer is itself a Processor.
>
> So here what happens if polling yields some result (omitting arguments of
> the methods for brevity).
>
> PollEnricher.process()
>      EventDrivenPollingConsumer.start()
>      EventDrivenPollingConsumer.receive()
>      ScheduledPollConsumer,doStart() //this will effectively schedule the
> polling on the S3Consumer in a separate thread
>      BlockingQueue.take() //this will block this thread until there's an
> Exchange on the queue
>
>
> So now the question is how does that polled exchange ending up on
> EventDrivenPollingConsumer's queue so that it can unblock and carry on.
>
> ScheduledPollConsumer.run()  //main entry to the polling task
>     S3Consumer.poll()
>     EventDrivenPollingConsumer.process()
>     BlockingQueue.put() //this effectively unblocks the
> PollEnricher.process() and the processing can carry on
>
>
> So there you have it, it's the EventDrivenPollingConsumer that is creating
> the S3Consumer in this case and it passes itself as a Procesor to the
> constructor, so S3Consumer can invoke the process() method.
>
>
> Ok, so what happens in my usecase? it's the S3Consumer.poll() that throws
> en exception from AWS service, so EventDrivenPollingConsumer.process() is
> not called. The scheduled polling task will run over and over again. The
> PollRnricher.process() blocked forever.
>
> So back to the PollingConsumerPollStrategy and it's rollback() method.
> Like I said, even if I return false from it, the polling will continue,
> because the polling task is never stopped. So what I ended up with was this:
>
> https://gist.github.com/anonymous/195f2b6c835db82decf1649969a717fe
>
> So basically after 3 attempts I am creating an Exchange with null Body and
> run the process() method on it from the rollback() method so that the route
> can continue, I return false and all looks good.
>
> Please note the dodgy Thread.sleep() which I put there for the backoff
> between retires, because all the backoff* configurations from
> ScheduledPollConsumer are working BETWEEN polls and not within the same
> poll which is what is happening if I return 'true' from the rollback()
> method.
>
> Is there a better way of doing this? This seems like a very common use
> case to me, perhaps this should be considered as a configuration of
> pollEnrich() or some other involved components here.
>
> What do I win for the longest email on the group? :p
>
> Best,
> Artur
>
> On Thu, Jun 22, 2017 at 5:36 PM, Artur Jablonski <ajablon...@ravenpack.com
> > wrote:
>
>>  I will try that, thank you for the hint!
>>
>> On Wed, Jun 21, 2017 at 5:40 PM, Taariq Levack <taar...@gmail.com> wrote:
>>
>>> Then you can try the custom pollStrategy.
>>>
>>>
>>> On 21 Jun 2017 10:57, "Artur Jablonski" <ajablon...@ravenpack.com>
>>> wrote:
>>>
>>> > Hello!
>>> >
>>> > Thanks for the hint
>>> >
>>> > So that works... kind of.
>>> >
>>> > First, with timeout(0) it doesn't work, it won't even try to poll for
>>> > anything. I suppose that's how it's supposed to work. Fine.
>>> >
>>> > With timeout(>0) it works, as in it gives up after the given number of
>>> > milliseconds and null is propagated down the route where I can
>>> > catch it in choice() and only process further those not null responses.
>>> >
>>> > This seems to me a bit brittle. I mean setting this timeout too low,
>>> could
>>> > potentially make me miss an existing file if some network glitch
>>> happened
>>> > that would slow down the request-response cycle, and setting it high
>>> seems
>>> > like a waste when the AWS-S3 Consumer throws an exception that clearly
>>> > tells me that file that I want doesn't exist.
>>> >
>>> > So what I am looking for, I guess, is exception handling mechanism for
>>> > Consumer, so that I can break polling on given exception.
>>> >
>>> > I looked here http://camel.apache.org/polling-consumer.html
>>> > and was looking at backoffErrorThreshold, backoffIdleThreshold and
>>> > backoffMultiplier as the only properties that mention error handling,
>>> but I
>>> > guess they're not for what I want.
>>> >
>>> > Am i missing something?
>>> >
>>> > Cheers
>>> > Artur
>>> >
>>> >
>>> > On Wed, Jun 21, 2017 at 6:26 AM, Taariq Levack <taar...@gmail.com>
>>> wrote:
>>> >
>>> > > You can use a timeout of 0 for receiving with no wait, or greater
>>> than 0
>>> > > for waiting a bit.
>>> > > Default is -1
>>> > >
>>> > > http://camel.apache.org/content-enricher.html
>>> > >
>>> > > Cheers,
>>> > > Taariq
>>> > >
>>> > > On 20 Jun 2017 22:12, "Artur Jablonski" <ajablon...@ravenpack.com>
>>> > wrote:
>>> > >
>>> > > > Right, that almost works.
>>> > > >
>>> > > > Now the problem I have is that when a file I ask for doesn't exist
>>> the
>>> > > > consumer seems to be polling for it indefinitely. Is there a way of
>>> > > saying:
>>> > > > hey polling consumer, try n times and then give up. Throw an error
>>> or
>>> > > > return null. Don't insist, please.
>>> > > >
>>> > > > Cheers
>>> > > > Artur
>>> > > >
>>> > > > On 20 Jun 2017 9:55 a.m., "Artur Jablonski" <
>>> ajablon...@ravenpack.com>
>>> > > > wrote:
>>> > > >
>>> > > > > Thank you so much Gregor!
>>> > > > >
>>> > > > > You pushed me in the right direction. I initially wrote a patch
>>> for
>>> > S3
>>> > > > > producer to get me what I wanted, but after seeing your reply I
>>> read
>>> > > > about
>>> > > > > the content enricher pattern and pollEnrich() in Camel.
>>> > > > >
>>> > > > > This SO was helpful as well
>>> > > > > https://stackoverflow.com/questions/36948005/how-do-
>>> > > > > dynamic-from-endpoints-and-exchanges-work-in-camel
>>> > > > >
>>> > > > > In the end, with this route:
>>> > > > >
>>> > > > > from("direct:pollEnrich").routeId("pollEnrich")
>>> > > > >                             .pollEnrich()
>>> > > > >                             .simple("aws-s3://{{buckets.
>>> > > > mybucket}}?amazonS3Client=#amazonS3Client&deleteAfterRead=false&
>>> > > > fileName=${in.header.CamelAwsS3Key}")
>>> > > > >                             .convertBodyTo(byte[].class)
>>> > > > >                             .to("bean:unmarshall")
>>> > > > >
>>> > > > >
>>> > > > > I can now poke the route and set the CamelAWSS3Key header to the
>>> > file I
>>> > > > > want.
>>> > > > >
>>> > > > > Cheerio!
>>> > > > > Artur
>>> > > > >
>>> > > > >
>>> > > > > On Mon, Jun 19, 2017 at 10:43 PM, Gregor Zurowski <
>>> > > > > gre...@list.zurowski.org> wrote:
>>> > > > >
>>> > > > >> Hi Artur,
>>> > > > >>
>>> > > > >> You should be able to get a single S3 object with the camel-aws
>>> > > > >> component using the "fileName" query parameter on the
>>> consumer.  See
>>> > > > >> the documentation at
>>> > > > >> https://github.com/apache/camel/blob/master/components/camel
>>> > > > >> -aws/src/main/docs/aws-s3-component.adoc.
>>> > > > >>
>>> > > > >> Gregor
>>> > > > >>
>>> > > > >> On Mon, Jun 19, 2017 at 5:28 PM, Artur Jablonski
>>> > > > >> <ajablon...@ravenpack.com> wrote:
>>> > > > >> > Hello,
>>> > > > >> >
>>> > > > >> > I am trying to get a single object from S3 via Camel and I am
>>> not
>>> > > sure
>>> > > > >> how
>>> > > > >> > to do this.
>>> > > > >> >
>>> > > > >> > It seems that the producer endpoint can only upload to S3 and
>>> the
>>> > > > >> consumer
>>> > > > >> > endpoint can poll a bucket, but how can express a use case of
>>> > > > >> retrieving a
>>> > > > >> > single S3 object when I know it's key?
>>> > > > >> >
>>> > > > >> > Best
>>> > > > >> > Artur
>>> > > > >>
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to