Hi John, thanks a lot for the great explanation and the links. After I've sent the question I've researched a bit more about EOS and I'm currently testing that out.
I'll read those links and see what I come up with! Thanks and have a great day! -- Alessandro Tagliapietra On Tue, Jul 9, 2019 at 2:45 PM John Roesler <j...@confluent.io> wrote: > Hi Alessandro, > > Sorry if I'm missing some of the context, but could you just keep > retrying the API call inside a loop? This would block any other > processing by the same thread, but it would allow Streams to stay up > in the face of transient failures. Otherwise, I'm afraid that throwing > an exception is the right thing to do. Streams would re-process the > record in question when it starts back up, but you'd have to re-start > it. You can do that programmatically, but it's a bit heavyweight as a > response to a transient API call failure. > > For reference, this is one of several problems that comes up when you > need to call out to external services during processing. Streams > currently lacks full support to make this a really pleasant > experience, but it's a perennial topic of discussion. See > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-311%3A+Async+processing+with+dynamic+scheduling+in+Kafka+Streams > and > https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams > for a couple of attempts to wrestle with the domain. > > To answer your latter question, the store should be returned to its > prior state when you restart, but if you want to be absolutely sure > this happens, you need to enable EOS. That will have the side-effect > of discarding any local state after a crash, though, which makes the > "crash and recover" strategy even more heavyweight. > > I'd recommend wrapping the API call in a retry loop that's as long as > you can tolerate and then crashing if you still don't get through. Be > sure to also look through the docs and find any heartbeat configs you > need to set. Off the top of my head, I think "max poll interval" at > least needs to be set bigger than your maximum expected pause. > Probably 2x the total retry-loop time would be a good choice. > > I hope this helps, > -John > > On Fri, Jul 5, 2019 at 6:30 PM Alessandro Tagliapietra > <tagliapietra.alessan...@gmail.com> wrote: > > > > Hello everyone, > > > > I'm looking into a way to reprocess messages in case of soft-errors (not > > exceptions) > > For example we have a topology that does this: > > input stream -> filtering/flatmap -> window and aggregate > > > > in our aggregate step (maybe should be moved into an additional step) we > > make an API call to one of our services. > > > > What I would like to do is to reprocess that message, even better if > > possible just the window computation when the API call fails. > > > > By reading this > > > https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processing-guarantees > > if > > I'm not mistaken with the default at least one semantic, if I throw an > > exception the topology will reprocess the messages after the last commit, > > is it possible instead to just soft-retry the last message without > throwing > > an exception and possibly reprocess also older correctly processed > messages? > > > > Also, if my topology starts from a stream uses multiple stores before > > windowing, if there's an error in the windowing step, what happens to the > > stores changes? When the message is reprocessed, will the store be in the > > state it was after it processed the message on the first try? > > > > Thank you in advance > > > > -- > > Alessandro Tagliapietra >