Thanks for the update, Matthias!

Other than Bruno’s good points, this proposal looks good to me. 


On Thu, May 14, 2020, at 07:17, Bruno Cadonna wrote:
> Hi Matthias,
> Thank you for the KIP. I like your KIP.
> Here my feedback:
> 1. The KIP is not clear about what should happen when
> expires. To facilitate the mapping from the error users might
> encounter due to timeouts to this KIP, it would be good to state the
> error that will be thrown when expires.
> 2. The KIP does also not clearly state how is
> measured. Does the time start with the first timeout exception and
> then run until either the timeout expires or the task receives a
> successful reply? Or is it started each time the task is processed by
> the stream thread and stopped when its turn is over and when the sum
> of the single times without a successful reply reaches the timeout an
> error is thrown?
> Best,
> Bruno
> On Tue, May 12, 2020 at 10:14 PM Matthias J. Sax <> wrote:
> >
> > John, Guozhang,
> >
> > thanks a lot for your feedback. I updated the KIP on a slightly
> > different angle: instead of using retries, we should switch to a timeout
> > based approach. I also extended the KIP to deprecate producer/admin
> > `retries` config.
> >
> > I like the proposal to skip a task if a client TimeoutException occurs
> > and just retry it later; update the KIP accordingly. However, I would
> > not retry forever by default. In general, all points you raised are
> > valid and the question is just what _default_ do we want to have. Given
> > the issue that tasks might get "out-of-sync" regarding their event-time
> > progress and that inexperience users might not do proper monitoring, I
> > prefer to have a "reasonable" default timeout if a task does not make
> > progress at all and fail for this case.
> >
> > I would also argue (following Guozhang) that we don't necessarily need
> > new metrics. Monitoring the number of alive threads (recently added),
> > consumer lag, processing rate etc should give an operator enough insight
> > into the application. I don't see the need atm to add some specify "task
> > timeout" metrics.
> >
> > For the issue of cascading failures, I would want to exclude it from
> > this KIP to keep it focused.
> >
> >
> > -Matthias
> >
> >
> > On 2/27/20 1:31 PM, Guozhang Wang wrote:
> > > Hello John,
> > >
> > > I'll make note that you owe me a beer now :)
> > >
> > > I think I'm leaning towards your approach as well based on my observations
> > > on previously reported timeout exceptions in the past. I once left some
> > > thoughts on Matthias' PR here
> > >
> > and I
> > > think I can better summarize my thoughts in this thread:
> > >
> > > 1) First of all, we need to think from user's perspective, what they'd
> > > really want to be notified:
> > >
> > > a. "If my application cannot make progress temporarily due to various
> > > transient issues (John had listed several examples already), just handle
> > > that internally and I do not wanna be notified and worry about how to tune
> > > my timeout configs at all".
> > > b. "If my application cannot make progress for a long time, which is
> > likely
> > > due to a bad config, a human error, network issues, etc such that I should
> > > be involved in the loop of trouble shooting, let me know sooner than
> > later".
> > >
> > > and what they'd not preferred but may happen today:
> > >
> > > c. "one transient error cause a thread to die, but then after tasks
> > > migrated everything goes to normal; so the application silently lost a
> > > thread without letting me know"; in fact, in such cases even a cascading
> > > exception that eventually kills all thread may be better since at
> > least the
> > > users would be notified.
> > >
> > > Based on that, instead of retrying immediately at the granularity each
> > > blocking call, it should be sufficient to only consider the handling logic
> > > at the thread level. That is, within an iteration of the thread, it would
> > > try to:
> > >
> > > * initialized some created tasks;
> > > * restored some restoring tasks;
> > > * processed some running tasks who have buffered records that are
> > > processable;
> > > * committed some tasks.
> > >
> > > In each of these steps, we may need to make some blocking calls in the
> > > underlying embedded clients, and if either of them timed out, we would not
> > > be able to make progress partially or not being able to make any progress
> > > at all. If we still want to set a configured value of "retries", I think a
> > > better idea would be to say "if we cannot make progress for consecutive N
> > > iterations of a thread, the user should be notified".
> > >
> > > ---------------
> > >
> > > 2) Second, let's consider what's a good way to notify the user. Today our
> > > way of notification is simple: throw the exception all the way up to
> > user's
> > > uncaught-exception-handler (if it's registered) and let the thread
> > die. I'm
> > > wondering if we could instead educate users to watch on some key metrics
> > > for "progress indicate" than relying on the exception handler though. Some
> > > candidates in mind:
> > >
> > > * consumer-lag: this is for both source topics and for repartition topics,
> > > it indicates if one or more of the tasks within each sub-topology is
> > > lagging or not; in the case where *some or all* of the threads cannot make
> > > progress. E.g. if a downstream task's thread is blocked somehow while its
> > > upstream task's thread is not, then the consumer-lag on the repartition
> > > topic would keep growing.
> > >
> > > * *idle* state: this is an idea we discussed in
> > >, i.e. to introduce an
> > > instance-level new state, if all threads of the instance cannot make
> > > progress (primarily for the reason that it cannot talk to the brokers).
> > >
> > > * process-rate: this is at thread-level. However if some tasks cannot make
> > > progress while others can still make progress within a thread, its
> > > process-rate would now drop to zero and it's a bit hard to indicate
> > > compared with comsumer-lag.
> > >
> > > If we feel that relying on metrics is better than throwing the exception
> > > and let the thread die, then we would not need to have the "retry" config
> > > as well.
> > >
> > > ---------------
> > >
> > > 3) This maybe semi-related to the timeout itself, but as I mentioned today
> > > one common issue we would need to resolve is to lose a thread BUT not
> > > losing the whole instance. In other words, we should consider when we have
> > > to throw an exception from a thread (not due to timeouts, but say due to
> > > some fatal error), should we just kill the corresponding thread or should
> > > we be more brutal and just kill the whole instance instead. I'm happy to
> > > defer this to another discussion thread but just bring this up here.
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Feb 27, 2020 at 10:40 AM John Roesler <> wrote:
> > >
> > >> Hi Matthias,
> > >>
> > >> Thanks for the proposal! I think this will be a wonderful improvement
> > >> to Streams. In particular, thanks for the motivation. It would indeed
> > >> be nice not to have to set long timeout configs and block individual
> > >> client requests in order to cope with transient slow responses.
> > >>
> > >> I'm very well aware that this might make me sound like a crazy person,
> > >> but one alternative I'd like to consider is not bounding the retries at
> > >> all.
> > >> Instead, Streams would just skip over timed-out tasks and try again
> > >> on the next iteration, as you proposed, but would continue to do so
> > >> indefinitely. Clearly, we shouldn't do such a thing silently, so I'd
> > >> further
> > >> propose to log a warning every time a task times out and also maintain
> > >> a new metric indicating task timeouts.
> > >>
> > >> To see why this might be attractive, let me pose a hypothetical
> > >> installation
> > >> which has thousands of Streams instances, maybe as part of hundreds of
> > >> applications belonging to dozens of teams. Let's also assume there is a
> > >> single broker cluster serving all these instances. Such an
> > environment has
> > >> a number of transient failure modes:
> > >> * A single broker instance may become slow to respond due to hardware
> > >> failures (e.g., a bad NIC) or other environmental causes (CPU competition
> > >> with co-resident processes, long JVM GC pauses, etc.). Single-broker
> > >> unavailability could cause some tasks to time out while others can
> > proceed
> > >> in an individual Streams instance.
> > >> * The entire broker cluster could become temporarily unavailable
> > (consider:
> > >> a faulty firewall configuration gets deployed, severing all Streams
> > >> instances
> > >> from the brokers).
> > >> * A faulty security configuration may temporarily sever whole application
> > >> from
> > >> the brokers.
> > >> * Any number of causes could likewise sever a single instance in a single
> > >> application from all brokers.
> > >> * Finally, networking problems can disconnect arbitrary pairs of Streams
> > >> instances and Broker instances.
> > >>
> > >> This is not an accounting of all possible failure modes, obviously,
> > but the
> > >> point is that, in a large, decentralized organization, you can experience
> > >> lots of transient failures that have some features in common:
> > >> F1. It's someone else's fault, and someone else must take action to
> > fix it.
> > >> F2. It will take "human time" to fix it. I.e., hours, not milliseconds.
> > >> F3. A single failure can affect "everyone" (e.g., one broker with
> > long GCs
> > >> can cause timeouts in all thousands of instances over all dozens of
> > teams).
> > >>
> > >> As an operator belonging to one team, whether we bound retries or not,
> > >> I would need to be alerted when the app stops making progress, I'd need
> > >> to investigate, and in the above cases, I'd need to escalate to the
> > network
> > >> and/or broker infrastructure teams.
> > >>
> > >> Clearly, I can be alerted either by threads dying or by non-progress
> > >> metrics.
> > >> As a responsible operator, I'd have alerts on _both_, so we shouldn't
> > >> consider
> > >> either signal to be "louder" or more reliable than the other.
> > >>
> > >> A side observation: in a lot of the failure modes, a specific task won't
> > >> be able
> > >> to make progress no matter which thread or instance it's on (i.e., if the
> > >> transaction coordinator for one of its output partitions is slow or
> > >> unresponsive).
> > >> Therefore, killing the thread with a bounded retry config would only
> > result
> > >> in a cascade of thread deaths across all my instances until either I run
> > >> out of
> > >> threads or the incident is resolved.
> > >>
> > >> The key questions to me are:
> > >> Q1. Do we want to continue trying to make what progress we can while
> > >> the incident is being investigated and remediated?
> > >> Q2. Should I (the operator for a single team) have to take any action
> > once
> > >> the infrastructure failures are resolved?
> > >>
> > >> We can paraphrase these as, "do you want your business to grind to a halt
> > >> due to a single failure?", and "do you want everyone to stay up all night
> > >> waiting for a fix so they can all restart their applications?"
> > >>
> > >> Just from the operator/business perspective, it seems like we want:
> > >> Q1:yes and Q2:no, which in combination with F1,2,3 above indicates
> > >> to me that it would be better for Streams to just keep retrying
> > >> indefinitely.
> > >>
> > >> There is one point I think you've mentioned to me in the past that it
> > >> may not be _safe_ to just quit working on one specific task while
> > >> progressing on others. If we have a repartition topic sourced by
> > >> two tasks T1 and T2, and feeding a windowed aggregation (for example),
> > >> then failing to process T1 while continuing on T2 for a long time
> > >> would cause a lot of timestamp skew, and could ultimately result in all
> > >> those delayed records in T1 being out of grace period by the time they
> > >> get processed. Arguably, this is a completely normal and expected
> > >> situation in a distributed system, which is why we have grace period to
> > >> begin with, but since the cause of this particular skew is inside of
> > >> Streams, it would be possible and nice to detect and avoid the situation.
> > >>
> > >> However, we should note that killing a single thread that hosts T1 will
> > >> _not_ deterministically halt processing on T2, nor will stopping the
> > >> single instance that hosts T1, since T2 might be on another instance.
> > >> We would need a cluster-wide broadcast of some kind to either halt
> > >> all processing on all tasks, or (more sophisticated) to halt processing
> > >> on T2 when we detect non-progress of T1.
> > >>
> > >> Depending on the failure mode, it's possible that just shuffling the
> > tasks
> > >> around could let us start making progress again, for example when only
> > >> a single Streams instance can't reach one or more brokers. However, this
> > >> objective is not accomplished by just stopping one thread, we would need
> > >> to relocate all tasks off the affected instance to attempt this
> > >> remediation.
> > >>
> > >> A separate thing to point out is that, just an instance being unavailable
> > >> for processing does not imply that it is also unavailable for querying.
> > >> Especially in light of KIP-535, where we started to support querying
> > >> "stale" stores, it seems worthwhile to keep the threads and instances
> > >> alive, even if they pause processing.
> > >>
> > >> Well, if you've made it this far, congratulations. I owe you a beer.
> > I hope
> > >> you don't interpret the length of this email as reflective of my
> > zeal. I'm
> > >> also ok with a bounded retries config of some kind, but I wanted to be
> > >> sure we've considered the above effects.
> > >>
> > >> Thanks,
> > >> -John
> > >>
> > >>
> > >> On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote:
> > > Hi,
> > >
> > > I would like to propose KIP-572 to make Kafka Streams more robust with
> > > regard to timeout exception handling.
> > >
> > >
> > > eouts+and+retries+in+Kafka+Streams
> > >
> > > Note, that there is a long list of rejected alternatives that can be
> > > used as starting point for the discussion. In fact, I am not sure if
> > > one of those listed alternative might be better than the current
> > > proposal -- I just had to pick one design for now (the reason why I
> > > picked the current design is that it's semantically fully backward
> > > compatible and does not introduce any new configs).
> > >
> > > Looking forward to your feedback.
> > >
> > > -Matthias
> > >>>
> > >>
> > >
> > >
> >

Reply via email to