Thanks for the update, Matthias! Other than Bruno’s good points, this proposal looks good to me.
Thanks, John On Thu, May 14, 2020, at 07:17, Bruno Cadonna wrote: > Hi Matthias, > > Thank you for the KIP. I like your KIP. > > Here my feedback: > > 1. The KIP is not clear about what should happen when task.timeout.ms > expires. To facilitate the mapping from the error users might > encounter due to timeouts to this KIP, it would be good to state the > error that will be thrown when task.timeout.ms expires. > > 2. The KIP does also not clearly state how task.timeout.ms is > measured. Does the time start with the first timeout exception and > then run until either the timeout expires or the task receives a > successful reply? Or is it started each time the task is processed by > the stream thread and stopped when its turn is over and when the sum > of the single times without a successful reply reaches the timeout an > error is thrown? > > Best, > Bruno > > On Tue, May 12, 2020 at 10:14 PM Matthias J. Sax <mj...@apache.org> wrote: > > > > John, Guozhang, > > > > thanks a lot for your feedback. I updated the KIP on a slightly > > different angle: instead of using retries, we should switch to a timeout > > based approach. I also extended the KIP to deprecate producer/admin > > `retries` config. > > > > I like the proposal to skip a task if a client TimeoutException occurs > > and just retry it later; update the KIP accordingly. However, I would > > not retry forever by default. In general, all points you raised are > > valid and the question is just what _default_ do we want to have. Given > > the issue that tasks might get "out-of-sync" regarding their event-time > > progress and that inexperience users might not do proper monitoring, I > > prefer to have a "reasonable" default timeout if a task does not make > > progress at all and fail for this case. > > > > I would also argue (following Guozhang) that we don't necessarily need > > new metrics. Monitoring the number of alive threads (recently added), > > consumer lag, processing rate etc should give an operator enough insight > > into the application. I don't see the need atm to add some specify "task > > timeout" metrics. > > > > For the issue of cascading failures, I would want to exclude it from > > this KIP to keep it focused. > > > > > > -Matthias > > > > > > On 2/27/20 1:31 PM, Guozhang Wang wrote: > > > Hello John, > > > > > > I'll make note that you owe me a beer now :) > > > > > > I think I'm leaning towards your approach as well based on my observations > > > on previously reported timeout exceptions in the past. I once left some > > > thoughts on Matthias' PR here > > > https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510 > > and I > > > think I can better summarize my thoughts in this thread: > > > > > > 1) First of all, we need to think from user's perspective, what they'd > > > really want to be notified: > > > > > > a. "If my application cannot make progress temporarily due to various > > > transient issues (John had listed several examples already), just handle > > > that internally and I do not wanna be notified and worry about how to tune > > > my timeout configs at all". > > > b. "If my application cannot make progress for a long time, which is > > likely > > > due to a bad config, a human error, network issues, etc such that I should > > > be involved in the loop of trouble shooting, let me know sooner than > > later". > > > > > > and what they'd not preferred but may happen today: > > > > > > c. "one transient error cause a thread to die, but then after tasks > > > migrated everything goes to normal; so the application silently lost a > > > thread without letting me know"; in fact, in such cases even a cascading > > > exception that eventually kills all thread may be better since at > > least the > > > users would be notified. > > > > > > Based on that, instead of retrying immediately at the granularity each > > > blocking call, it should be sufficient to only consider the handling logic > > > at the thread level. That is, within an iteration of the thread, it would > > > try to: > > > > > > * initialized some created tasks; > > > * restored some restoring tasks; > > > * processed some running tasks who have buffered records that are > > > processable; > > > * committed some tasks. > > > > > > In each of these steps, we may need to make some blocking calls in the > > > underlying embedded clients, and if either of them timed out, we would not > > > be able to make progress partially or not being able to make any progress > > > at all. If we still want to set a configured value of "retries", I think a > > > better idea would be to say "if we cannot make progress for consecutive N > > > iterations of a thread, the user should be notified". > > > > > > --------------- > > > > > > 2) Second, let's consider what's a good way to notify the user. Today our > > > way of notification is simple: throw the exception all the way up to > > user's > > > uncaught-exception-handler (if it's registered) and let the thread > > die. I'm > > > wondering if we could instead educate users to watch on some key metrics > > > for "progress indicate" than relying on the exception handler though. Some > > > candidates in mind: > > > > > > * consumer-lag: this is for both source topics and for repartition topics, > > > it indicates if one or more of the tasks within each sub-topology is > > > lagging or not; in the case where *some or all* of the threads cannot make > > > progress. E.g. if a downstream task's thread is blocked somehow while its > > > upstream task's thread is not, then the consumer-lag on the repartition > > > topic would keep growing. > > > > > > * *idle* state: this is an idea we discussed in > > > https://issues.apache.org/jira/browse/KAFKA-6520, i.e. to introduce an > > > instance-level new state, if all threads of the instance cannot make > > > progress (primarily for the reason that it cannot talk to the brokers). > > > > > > * process-rate: this is at thread-level. However if some tasks cannot make > > > progress while others can still make progress within a thread, its > > > process-rate would now drop to zero and it's a bit hard to indicate > > > compared with comsumer-lag. > > > > > > If we feel that relying on metrics is better than throwing the exception > > > and let the thread die, then we would not need to have the "retry" config > > > as well. > > > > > > --------------- > > > > > > 3) This maybe semi-related to the timeout itself, but as I mentioned today > > > one common issue we would need to resolve is to lose a thread BUT not > > > losing the whole instance. In other words, we should consider when we have > > > to throw an exception from a thread (not due to timeouts, but say due to > > > some fatal error), should we just kill the corresponding thread or should > > > we be more brutal and just kill the whole instance instead. I'm happy to > > > defer this to another discussion thread but just bring this up here. > > > > > > > > > > > > Guozhang > > > > > > > > > On Thu, Feb 27, 2020 at 10:40 AM John Roesler <vvcep...@apache.org> wrote: > > > > > >> Hi Matthias, > > >> > > >> Thanks for the proposal! I think this will be a wonderful improvement > > >> to Streams. In particular, thanks for the motivation. It would indeed > > >> be nice not to have to set long timeout configs and block individual > > >> client requests in order to cope with transient slow responses. > > >> > > >> I'm very well aware that this might make me sound like a crazy person, > > >> but one alternative I'd like to consider is not bounding the retries at > > >> all. > > >> Instead, Streams would just skip over timed-out tasks and try again > > >> on the next iteration, as you proposed, but would continue to do so > > >> indefinitely. Clearly, we shouldn't do such a thing silently, so I'd > > >> further > > >> propose to log a warning every time a task times out and also maintain > > >> a new metric indicating task timeouts. > > >> > > >> To see why this might be attractive, let me pose a hypothetical > > >> installation > > >> which has thousands of Streams instances, maybe as part of hundreds of > > >> applications belonging to dozens of teams. Let's also assume there is a > > >> single broker cluster serving all these instances. Such an > > environment has > > >> a number of transient failure modes: > > >> * A single broker instance may become slow to respond due to hardware > > >> failures (e.g., a bad NIC) or other environmental causes (CPU competition > > >> with co-resident processes, long JVM GC pauses, etc.). Single-broker > > >> unavailability could cause some tasks to time out while others can > > proceed > > >> in an individual Streams instance. > > >> * The entire broker cluster could become temporarily unavailable > > (consider: > > >> a faulty firewall configuration gets deployed, severing all Streams > > >> instances > > >> from the brokers). > > >> * A faulty security configuration may temporarily sever whole application > > >> from > > >> the brokers. > > >> * Any number of causes could likewise sever a single instance in a single > > >> application from all brokers. > > >> * Finally, networking problems can disconnect arbitrary pairs of Streams > > >> instances and Broker instances. > > >> > > >> This is not an accounting of all possible failure modes, obviously, > > but the > > >> point is that, in a large, decentralized organization, you can experience > > >> lots of transient failures that have some features in common: > > >> F1. It's someone else's fault, and someone else must take action to > > fix it. > > >> F2. It will take "human time" to fix it. I.e., hours, not milliseconds. > > >> F3. A single failure can affect "everyone" (e.g., one broker with > > long GCs > > >> can cause timeouts in all thousands of instances over all dozens of > > teams). > > >> > > >> As an operator belonging to one team, whether we bound retries or not, > > >> I would need to be alerted when the app stops making progress, I'd need > > >> to investigate, and in the above cases, I'd need to escalate to the > > network > > >> and/or broker infrastructure teams. > > >> > > >> Clearly, I can be alerted either by threads dying or by non-progress > > >> metrics. > > >> As a responsible operator, I'd have alerts on _both_, so we shouldn't > > >> consider > > >> either signal to be "louder" or more reliable than the other. > > >> > > >> A side observation: in a lot of the failure modes, a specific task won't > > >> be able > > >> to make progress no matter which thread or instance it's on (i.e., if the > > >> transaction coordinator for one of its output partitions is slow or > > >> unresponsive). > > >> Therefore, killing the thread with a bounded retry config would only > > result > > >> in a cascade of thread deaths across all my instances until either I run > > >> out of > > >> threads or the incident is resolved. > > >> > > >> The key questions to me are: > > >> Q1. Do we want to continue trying to make what progress we can while > > >> the incident is being investigated and remediated? > > >> Q2. Should I (the operator for a single team) have to take any action > > once > > >> the infrastructure failures are resolved? > > >> > > >> We can paraphrase these as, "do you want your business to grind to a halt > > >> due to a single failure?", and "do you want everyone to stay up all night > > >> waiting for a fix so they can all restart their applications?" > > >> > > >> Just from the operator/business perspective, it seems like we want: > > >> Q1:yes and Q2:no, which in combination with F1,2,3 above indicates > > >> to me that it would be better for Streams to just keep retrying > > >> indefinitely. > > >> > > >> There is one point I think you've mentioned to me in the past that it > > >> may not be _safe_ to just quit working on one specific task while > > >> progressing on others. If we have a repartition topic sourced by > > >> two tasks T1 and T2, and feeding a windowed aggregation (for example), > > >> then failing to process T1 while continuing on T2 for a long time > > >> would cause a lot of timestamp skew, and could ultimately result in all > > >> those delayed records in T1 being out of grace period by the time they > > >> get processed. Arguably, this is a completely normal and expected > > >> situation in a distributed system, which is why we have grace period to > > >> begin with, but since the cause of this particular skew is inside of > > >> Streams, it would be possible and nice to detect and avoid the situation. > > >> > > >> However, we should note that killing a single thread that hosts T1 will > > >> _not_ deterministically halt processing on T2, nor will stopping the > > >> single instance that hosts T1, since T2 might be on another instance. > > >> We would need a cluster-wide broadcast of some kind to either halt > > >> all processing on all tasks, or (more sophisticated) to halt processing > > >> on T2 when we detect non-progress of T1. > > >> > > >> Depending on the failure mode, it's possible that just shuffling the > > tasks > > >> around could let us start making progress again, for example when only > > >> a single Streams instance can't reach one or more brokers. However, this > > >> objective is not accomplished by just stopping one thread, we would need > > >> to relocate all tasks off the affected instance to attempt this > > >> remediation. > > >> > > >> A separate thing to point out is that, just an instance being unavailable > > >> for processing does not imply that it is also unavailable for querying. > > >> Especially in light of KIP-535, where we started to support querying > > >> "stale" stores, it seems worthwhile to keep the threads and instances > > >> alive, even if they pause processing. > > >> > > >> Well, if you've made it this far, congratulations. I owe you a beer. > > I hope > > >> you don't interpret the length of this email as reflective of my > > zeal. I'm > > >> also ok with a bounded retries config of some kind, but I wanted to be > > >> sure we've considered the above effects. > > >> > > >> Thanks, > > >> -John > > >> > > >> > > >> On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote: > > > Hi, > > > > > > I would like to propose KIP-572 to make Kafka Streams more robust with > > > regard to timeout exception handling. > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim > > > eouts+and+retries+in+Kafka+Streams > > > > > > Note, that there is a long list of rejected alternatives that can be > > > used as starting point for the discussion. In fact, I am not sure if > > > one of those listed alternative might be better than the current > > > proposal -- I just had to pick one design for now (the reason why I > > > picked the current design is that it's semantically fully backward > > > compatible and does not introduce any new configs). > > > > > > Looking forward to your feedback. > > > > > > -Matthias > > >>> > > >> > > > > > > > > >