Right,
So I managed to get that working, though I am not sure if that's the proper
way of doing this or more of a hack.

So again, I want to use pollEnrich() to fetch a specific file from S3. If a
file doesn't exist then I want to pass a null message down the route. To
simplify things let's say that for ANY error that happens during poll I
want to retry 3 times (with some backoff between) and then give up and pass
null. I DON"T want to use the timeout, as it seems brittle as I explained
in my previous email

The problem with that is that it seems that by default if there's an error
while polling, pollEnrich() will try again and again indefinitely. Bummer.

Taariq suggested using a custom pollStrategy to achieve correct handling of
that scenario. PollingConsumerPollStrategy has 3 methods, of which the
"boolean rollback(Consumer, Endpoint, int, Exception)" is of interest here
as it's the one called when en error happen during poll. The method is
supposed to return 'true' if you want to retry or 'false' otherwise. But
here's the confusing thing. The DefaultPollingConsumerPollStrategy that's
used if you don't define one always returns false and yet the polling
doesn't stop when error happens.

Fine, so after digging into the code here's what I discovered:

pollEnrich() in Java DSL will create PollEnricher in the route which among
other types, implements Processor. PollEnricher in its process() method
resolves a Consumer to which it will delegate using one of
receive()/receiveNoWait()/receive(timeout) methods depending whether a
timeout has been set or not. In my case I am not setting timeout so the
method it will delegate to is Consumer.receive().

At runtime that Consumer is an instance of EventDrivenPollingConsumer that
in turn delegates to the S3Consumer (which is an implementation of
ScheduledBatchPollingConsumer). The receive() method of the
EventDrivenPollingConsumer implementation will BLOCK until there's an
Exchange on it's internal queue. A place where an Exchange is put on that
queue is on the Consumer's process() method, because
EventDrivenPollingConsumer is itself a Processor.

So here what happens if polling yields some result (omitting arguments of
the methods for brevity).

PollEnricher.process()
     EventDrivenPollingConsumer.start()
     EventDrivenPollingConsumer.receive()
     ScheduledPollConsumer,doStart() //this will effectively schedule the
polling on the S3Consumer in a separate thread
     BlockingQueue.take() //this will block this thread until there's an
Exchange on the queue


So now the question is how does that polled exchange ending up on
EventDrivenPollingConsumer's queue so that it can unblock and carry on.

ScheduledPollConsumer.run()  //main entry to the polling task
    S3Consumer.poll()
    EventDrivenPollingConsumer.process()
    BlockingQueue.put() //this effectively unblocks the
PollEnricher.process() and the processing can carry on


So there you have it, it's the EventDrivenPollingConsumer that is creating
the S3Consumer in this case and it passes itself as a Procesor to the
constructor, so S3Consumer can invoke the process() method.


Ok, so what happens in my usecase? it's the S3Consumer.poll() that throws
en exception from AWS service, so EventDrivenPollingConsumer.process() is
not called. The scheduled polling task will run over and over again. The
PollRnricher.process() blocked forever.

So back to the PollingConsumerPollStrategy and it's rollback() method. Like
I said, even if I return false from it, the polling will continue, because
the polling task is never stopped. So what I ended up with was this:

https://gist.github.com/anonymous/195f2b6c835db82decf1649969a717fe

So basically after 3 attempts I am creating an Exchange with null Body and
run the process() method on it from the rollback() method so that the route
can continue, I return false and all looks good.

Please note the dodgy Thread.sleep() which I put there for the backoff
between retires, because all the backoff* configurations from
ScheduledPollConsumer are working BETWEEN polls and not within the same
poll which is what is happening if I return 'true' from the rollback()
method.

Is there a better way of doing this? This seems like a very common use case
to me, perhaps this should be considered as a configuration of pollEnrich()
or some other involved components here.

What do I win for the longest email on the group? :p

Best,
Artur

On Thu, Jun 22, 2017 at 5:36 PM, Artur Jablonski <ajablon...@ravenpack.com>
wrote:

>  I will try that, thank you for the hint!
>
> On Wed, Jun 21, 2017 at 5:40 PM, Taariq Levack <taar...@gmail.com> wrote:
>
>> Then you can try the custom pollStrategy.
>>
>>
>> On 21 Jun 2017 10:57, "Artur Jablonski" <ajablon...@ravenpack.com> wrote:
>>
>> > Hello!
>> >
>> > Thanks for the hint
>> >
>> > So that works... kind of.
>> >
>> > First, with timeout(0) it doesn't work, it won't even try to poll for
>> > anything. I suppose that's how it's supposed to work. Fine.
>> >
>> > With timeout(>0) it works, as in it gives up after the given number of
>> > milliseconds and null is propagated down the route where I can
>> > catch it in choice() and only process further those not null responses.
>> >
>> > This seems to me a bit brittle. I mean setting this timeout too low,
>> could
>> > potentially make me miss an existing file if some network glitch
>> happened
>> > that would slow down the request-response cycle, and setting it high
>> seems
>> > like a waste when the AWS-S3 Consumer throws an exception that clearly
>> > tells me that file that I want doesn't exist.
>> >
>> > So what I am looking for, I guess, is exception handling mechanism for
>> > Consumer, so that I can break polling on given exception.
>> >
>> > I looked here http://camel.apache.org/polling-consumer.html
>> > and was looking at backoffErrorThreshold, backoffIdleThreshold and
>> > backoffMultiplier as the only properties that mention error handling,
>> but I
>> > guess they're not for what I want.
>> >
>> > Am i missing something?
>> >
>> > Cheers
>> > Artur
>> >
>> >
>> > On Wed, Jun 21, 2017 at 6:26 AM, Taariq Levack <taar...@gmail.com>
>> wrote:
>> >
>> > > You can use a timeout of 0 for receiving with no wait, or greater
>> than 0
>> > > for waiting a bit.
>> > > Default is -1
>> > >
>> > > http://camel.apache.org/content-enricher.html
>> > >
>> > > Cheers,
>> > > Taariq
>> > >
>> > > On 20 Jun 2017 22:12, "Artur Jablonski" <ajablon...@ravenpack.com>
>> > wrote:
>> > >
>> > > > Right, that almost works.
>> > > >
>> > > > Now the problem I have is that when a file I ask for doesn't exist
>> the
>> > > > consumer seems to be polling for it indefinitely. Is there a way of
>> > > saying:
>> > > > hey polling consumer, try n times and then give up. Throw an error
>> or
>> > > > return null. Don't insist, please.
>> > > >
>> > > > Cheers
>> > > > Artur
>> > > >
>> > > > On 20 Jun 2017 9:55 a.m., "Artur Jablonski" <
>> ajablon...@ravenpack.com>
>> > > > wrote:
>> > > >
>> > > > > Thank you so much Gregor!
>> > > > >
>> > > > > You pushed me in the right direction. I initially wrote a patch
>> for
>> > S3
>> > > > > producer to get me what I wanted, but after seeing your reply I
>> read
>> > > > about
>> > > > > the content enricher pattern and pollEnrich() in Camel.
>> > > > >
>> > > > > This SO was helpful as well
>> > > > > https://stackoverflow.com/questions/36948005/how-do-
>> > > > > dynamic-from-endpoints-and-exchanges-work-in-camel
>> > > > >
>> > > > > In the end, with this route:
>> > > > >
>> > > > > from("direct:pollEnrich").routeId("pollEnrich")
>> > > > >                             .pollEnrich()
>> > > > >                             .simple("aws-s3://{{buckets.
>> > > > mybucket}}?amazonS3Client=#amazonS3Client&deleteAfterRead=false&
>> > > > fileName=${in.header.CamelAwsS3Key}")
>> > > > >                             .convertBodyTo(byte[].class)
>> > > > >                             .to("bean:unmarshall")
>> > > > >
>> > > > >
>> > > > > I can now poke the route and set the CamelAWSS3Key header to the
>> > file I
>> > > > > want.
>> > > > >
>> > > > > Cheerio!
>> > > > > Artur
>> > > > >
>> > > > >
>> > > > > On Mon, Jun 19, 2017 at 10:43 PM, Gregor Zurowski <
>> > > > > gre...@list.zurowski.org> wrote:
>> > > > >
>> > > > >> Hi Artur,
>> > > > >>
>> > > > >> You should be able to get a single S3 object with the camel-aws
>> > > > >> component using the "fileName" query parameter on the consumer.
>> See
>> > > > >> the documentation at
>> > > > >> https://github.com/apache/camel/blob/master/components/camel
>> > > > >> -aws/src/main/docs/aws-s3-component.adoc.
>> > > > >>
>> > > > >> Gregor
>> > > > >>
>> > > > >> On Mon, Jun 19, 2017 at 5:28 PM, Artur Jablonski
>> > > > >> <ajablon...@ravenpack.com> wrote:
>> > > > >> > Hello,
>> > > > >> >
>> > > > >> > I am trying to get a single object from S3 via Camel and I am
>> not
>> > > sure
>> > > > >> how
>> > > > >> > to do this.
>> > > > >> >
>> > > > >> > It seems that the producer endpoint can only upload to S3 and
>> the
>> > > > >> consumer
>> > > > >> > endpoint can poll a bucket, but how can express a use case of
>> > > > >> retrieving a
>> > > > >> > single S3 object when I know it's key?
>> > > > >> >
>> > > > >> > Best
>> > > > >> > Artur
>> > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to