Hi John,

thanks a lot for the great explanation and the links.
After I've sent the question I've researched a bit more about EOS and I'm
currently testing that out.

I'll read those links and see what I come up with!

Thanks and have a great day!

--
Alessandro Tagliapietra

On Tue, Jul 9, 2019 at 2:45 PM John Roesler <j...@confluent.io> wrote:

> Hi Alessandro,
>
> Sorry if I'm missing some of the context, but could you just keep
> retrying the API call inside a loop? This would block any other
> processing by the same thread, but it would allow Streams to stay up
> in the face of transient failures. Otherwise, I'm afraid that throwing
> an exception is the right thing to do. Streams would re-process the
> record in question when it starts back up, but you'd have to re-start
> it. You can do that programmatically, but it's a bit heavyweight as a
> response to a transient API call failure.
>
> For reference, this is one of several problems that comes up when you
> need to call out to external services during processing. Streams
> currently lacks full support to make this a really pleasant
> experience, but it's a perennial topic of discussion. See
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-311%3A+Async+processing+with+dynamic+scheduling+in+Kafka+Streams
> and
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams
> for a couple of attempts to wrestle with the domain.
>
> To answer your latter question, the store should be returned to its
> prior state when you restart, but if you want to be absolutely sure
> this happens, you need to enable EOS. That will have the side-effect
> of discarding any local state after a crash, though, which makes the
> "crash and recover" strategy even more heavyweight.
>
> I'd recommend wrapping the API call in a retry loop that's as long as
> you can tolerate and then crashing if you still don't get through. Be
> sure to also look through the docs and find any heartbeat configs you
> need to set. Off the top of my head, I think "max poll interval" at
> least needs to be set bigger than your maximum expected pause.
> Probably 2x the total retry-loop time would be a good choice.
>
> I hope this helps,
> -John
>
> On Fri, Jul 5, 2019 at 6:30 PM Alessandro Tagliapietra
> <tagliapietra.alessan...@gmail.com> wrote:
> >
> > Hello everyone,
> >
> > I'm looking into a way to reprocess messages in case of soft-errors (not
> > exceptions)
> > For example we have a topology that does this:
> >     input stream -> filtering/flatmap -> window and aggregate
> >
> > in our aggregate step (maybe should be moved into an additional step) we
> > make an API call to one of our services.
> >
> > What I would like to do is to reprocess that message, even better if
> > possible just the window computation when the API call fails.
> >
> > By reading this
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processing-guarantees
> > if
> > I'm not mistaken with the default at least one semantic, if I throw an
> > exception the topology will reprocess the messages after the last commit,
> > is it possible instead to just soft-retry the last message without
> throwing
> > an exception and possibly reprocess also older correctly processed
> messages?
> >
> > Also, if my topology starts from a stream uses multiple stores before
> > windowing, if there's an error in the windowing step, what happens to the
> > stores changes? When the message is reprocessed, will the store be in the
> > state it was after it processed the message on the first try?
> >
> > Thank you in advance
> >
> > --
> > Alessandro Tagliapietra
>

Reply via email to