Hi Alessandro,

Sorry if I'm missing some of the context, but could you just keep
retrying the API call inside a loop? This would block any other
processing by the same thread, but it would allow Streams to stay up
in the face of transient failures. Otherwise, I'm afraid that throwing
an exception is the right thing to do. Streams would re-process the
record in question when it starts back up, but you'd have to re-start
it. You can do that programmatically, but it's a bit heavyweight as a
response to a transient API call failure.

For reference, this is one of several problems that comes up when you
need to call out to external services during processing. Streams
currently lacks full support to make this a really pleasant
experience, but it's a perennial topic of discussion. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-311%3A+Async+processing+with+dynamic+scheduling+in+Kafka+Streams
and 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams
for a couple of attempts to wrestle with the domain.

To answer your latter question, the store should be returned to its
prior state when you restart, but if you want to be absolutely sure
this happens, you need to enable EOS. That will have the side-effect
of discarding any local state after a crash, though, which makes the
"crash and recover" strategy even more heavyweight.

I'd recommend wrapping the API call in a retry loop that's as long as
you can tolerate and then crashing if you still don't get through. Be
sure to also look through the docs and find any heartbeat configs you
need to set. Off the top of my head, I think "max poll interval" at
least needs to be set bigger than your maximum expected pause.
Probably 2x the total retry-loop time would be a good choice.

I hope this helps,
-John

On Fri, Jul 5, 2019 at 6:30 PM Alessandro Tagliapietra
<tagliapietra.alessan...@gmail.com> wrote:
>
> Hello everyone,
>
> I'm looking into a way to reprocess messages in case of soft-errors (not
> exceptions)
> For example we have a topology that does this:
>     input stream -> filtering/flatmap -> window and aggregate
>
> in our aggregate step (maybe should be moved into an additional step) we
> make an API call to one of our services.
>
> What I would like to do is to reprocess that message, even better if
> possible just the window computation when the API call fails.
>
> By reading this
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processing-guarantees
> if
> I'm not mistaken with the default at least one semantic, if I throw an
> exception the topology will reprocess the messages after the last commit,
> is it possible instead to just soft-retry the last message without throwing
> an exception and possibly reprocess also older correctly processed messages?
>
> Also, if my topology starts from a stream uses multiple stores before
> windowing, if there's an error in the windowing step, what happens to the
> stores changes? When the message is reprocessed, will the store be in the
> state it was after it processed the message on the first try?
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra

Reply via email to