Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
I'm fine with always removing threads at DEAD state from localThreadsMetadata(). On the other hand, if we want to give users more info for debugging, we can consider eating the complexity ourselves by guaranteeing that, within the user registered uncaught exception handler, the localThreadsMetadata() would include the thread who's dying / throwing at that moment. Implementation wise we can always register our own internal handler which calles the user registered one if it is set, and only after that we remove the thread metadata from the instance cache. Personally I think it is a bit too much, so I'm in favor of "adding it later when people complain" :) Guozhang On Fri, Sep 18, 2020 at 5:17 PM Sophie Blee-Goldman wrote: > Makes sense to me :) > > On Thu, Sep 17, 2020 at 9:34 AM Bruno Cadonna wrote: > > > Hi Sophie, > > > > Thank you for the feedback! I replied inline. > > > > Best, > > Bruno > > > > On 16.09.20 19:19, Sophie Blee-Goldman wrote: > > >> > > >> We guarantee that the metadata of the dead stream threads will be > > >> returned by KafkaStreams#localThreadsMetadata() at least until the > next > > >> call to KafkaStreams#addStreamThread() or > > >> KafkaStreams#removeStreamThread() after the stream thread transited to > > >> DEAD > > > > > > > > > This seems kind of tricky...personally I would find it pretty odd if I > > > queried the > > > local thread metadata and found two threads, A (alive) and B (dead), > and > > > then > > > called removeStreamThread() and now suddenly I have zero. Or if I call > > > addStreamThread and now I still have two threads. > > > > > > > The behavior might be unusual, but it is well defined and not random by > > any means. > > > > > Both of those results seem to indicate that only live threads "count" > and > > > are returned > > > by localThreadsMetadata(). But in reality we do temporarily keep the > dead > > > thread, > > > but only for the arbitrary amount of time until the next time you want > to > > > add or > > > remove some other stream thread? That seems like a weird side effect of > > the > > > add/removeStreamThread APIs. > > > > > > > This is not a side effect that just happens to occur. This is a > > guarantee that users get. It gives users the possibility to retrieve the > > metadata of the dead stream threads since the last call to > > add/removeStreamThread. Admittedly, this guarantee overlap with the > > current/planned implementation. But that is more a coincidence. > > > > I would be more concerned about when add/removeStreamThread is called > > from different threads which could happen if an uncaught exception > > handler is called that wants to replace a stream thread and a thread > > that is responsible for automated scaling up is running. > > > > > If we really think users might want to log the metadata of dead > threads, > > > then > > > let's just do that for them or give them a way to do exactly that. > > > > > > > Logging the metatdata of dead stream threads for the user is a valid > > alternative. Giving users the way to do exactly that is hard because the > > StreamThread class is not part of the public API. They would always need > > to call a method on the KafkaStreams object where we already have > > localThreadsMetadata(). > > > > > I'm not that concerned about the backwards compatibility of removing > dead > > > threads from the localThreadsMetadata, because I find it hard to > believe > > > that > > > users do anything other than just skip over them in the list (set?) > that > > > gets > > > returned. But maybe someone can chime in with an example use case. > > > > > > > I am also not too much concerned about backwards compatibility. That > > would indeed be a side effect of the current proposal. > > > > > I'm actually even a little skeptical that any users might want to log > the > > > metadata of a > > > dead thread, since all of the metadata is only useful for IQ on live > > > threads or > > > already covered by other easily discoverable logging elsewhere, or > both. > > > > > > > Said all of the above, I actually agree with you that there is not that > > much information in the metadata of a dead stream thread that is > > interesting. The name of the stream thread is known in the uncaught > > exception handler. The names of the clients, like consumer etc., used by > > the stream thread can be derived from the name of the stream thread. > > Finally, the sets of active and standby tasks should be empty for a dead > > stream thread. > > > > Hence, I backpedal and propose to filter out dead stream threads from > > localThreadsMetadata(). WDYT? > > > > > On Wed, Sep 16, 2020 at 2:07 AM Bruno Cadonna > > wrote: > > > > > >> Hi again, > > >> > > >> I just realized that if we filter out DEAD stream threads in > > >> localThreadsMetadata(), users cannot log the metadata of dying stream > > >> threads in the uncaught exception handler. > > >> > > >> I realized this thanks to the example Guozhang requested in the KIP. > > >>
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Makes sense to me :) On Thu, Sep 17, 2020 at 9:34 AM Bruno Cadonna wrote: > Hi Sophie, > > Thank you for the feedback! I replied inline. > > Best, > Bruno > > On 16.09.20 19:19, Sophie Blee-Goldman wrote: > >> > >> We guarantee that the metadata of the dead stream threads will be > >> returned by KafkaStreams#localThreadsMetadata() at least until the next > >> call to KafkaStreams#addStreamThread() or > >> KafkaStreams#removeStreamThread() after the stream thread transited to > >> DEAD > > > > > > This seems kind of tricky...personally I would find it pretty odd if I > > queried the > > local thread metadata and found two threads, A (alive) and B (dead), and > > then > > called removeStreamThread() and now suddenly I have zero. Or if I call > > addStreamThread and now I still have two threads. > > > > The behavior might be unusual, but it is well defined and not random by > any means. > > > Both of those results seem to indicate that only live threads "count" and > > are returned > > by localThreadsMetadata(). But in reality we do temporarily keep the dead > > thread, > > but only for the arbitrary amount of time until the next time you want to > > add or > > remove some other stream thread? That seems like a weird side effect of > the > > add/removeStreamThread APIs. > > > > This is not a side effect that just happens to occur. This is a > guarantee that users get. It gives users the possibility to retrieve the > metadata of the dead stream threads since the last call to > add/removeStreamThread. Admittedly, this guarantee overlap with the > current/planned implementation. But that is more a coincidence. > > I would be more concerned about when add/removeStreamThread is called > from different threads which could happen if an uncaught exception > handler is called that wants to replace a stream thread and a thread > that is responsible for automated scaling up is running. > > > If we really think users might want to log the metadata of dead threads, > > then > > let's just do that for them or give them a way to do exactly that. > > > > Logging the metatdata of dead stream threads for the user is a valid > alternative. Giving users the way to do exactly that is hard because the > StreamThread class is not part of the public API. They would always need > to call a method on the KafkaStreams object where we already have > localThreadsMetadata(). > > > I'm not that concerned about the backwards compatibility of removing dead > > threads from the localThreadsMetadata, because I find it hard to believe > > that > > users do anything other than just skip over them in the list (set?) that > > gets > > returned. But maybe someone can chime in with an example use case. > > > > I am also not too much concerned about backwards compatibility. That > would indeed be a side effect of the current proposal. > > > I'm actually even a little skeptical that any users might want to log the > > metadata of a > > dead thread, since all of the metadata is only useful for IQ on live > > threads or > > already covered by other easily discoverable logging elsewhere, or both. > > > > Said all of the above, I actually agree with you that there is not that > much information in the metadata of a dead stream thread that is > interesting. The name of the stream thread is known in the uncaught > exception handler. The names of the clients, like consumer etc., used by > the stream thread can be derived from the name of the stream thread. > Finally, the sets of active and standby tasks should be empty for a dead > stream thread. > > Hence, I backpedal and propose to filter out dead stream threads from > localThreadsMetadata(). WDYT? > > > On Wed, Sep 16, 2020 at 2:07 AM Bruno Cadonna > wrote: > > > >> Hi again, > >> > >> I just realized that if we filter out DEAD stream threads in > >> localThreadsMetadata(), users cannot log the metadata of dying stream > >> threads in the uncaught exception handler. > >> > >> I realized this thanks to the example Guozhang requested in the KIP. > >> Thank you for that, Guozhang! > >> > >> Hence, I adapted the KIP as follows: > >> > >> - We do not filter out DEAD stream threads in > >> KafkaStreams#localThreadsMetadata() > >> > >> - We guarantee that the metadata of the dead stream threads will be > >> returned by KafkaStreams#localThreadsMetadata() at least until the next > >> call to KafkaStreams#addStreamThread() or > >> KafkaStreams#removeStreamThread() after the stream thread transited to > >> DEAD. Besides giving users the opportunity to log the metadata of a > >> dying stream thread in its uncaught exception handler, this guarantee > >> makes KafkaStreams#localThreadsMetadata() completely backward compatible > >> to the current behavior, because if KafkaStreams#addStreamThread() and > >> KafkaStreams#removeStreamThread() are never called, > >> KafkaStreams#localThreadsMetadata() will also return the metadata of all > >> streams threads that have ever died which corresponds to the current > >>
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi Sophie, Thank you for the feedback! I replied inline. Best, Bruno On 16.09.20 19:19, Sophie Blee-Goldman wrote: We guarantee that the metadata of the dead stream threads will be returned by KafkaStreams#localThreadsMetadata() at least until the next call to KafkaStreams#addStreamThread() or KafkaStreams#removeStreamThread() after the stream thread transited to DEAD This seems kind of tricky...personally I would find it pretty odd if I queried the local thread metadata and found two threads, A (alive) and B (dead), and then called removeStreamThread() and now suddenly I have zero. Or if I call addStreamThread and now I still have two threads. The behavior might be unusual, but it is well defined and not random by any means. Both of those results seem to indicate that only live threads "count" and are returned by localThreadsMetadata(). But in reality we do temporarily keep the dead thread, but only for the arbitrary amount of time until the next time you want to add or remove some other stream thread? That seems like a weird side effect of the add/removeStreamThread APIs. This is not a side effect that just happens to occur. This is a guarantee that users get. It gives users the possibility to retrieve the metadata of the dead stream threads since the last call to add/removeStreamThread. Admittedly, this guarantee overlap with the current/planned implementation. But that is more a coincidence. I would be more concerned about when add/removeStreamThread is called from different threads which could happen if an uncaught exception handler is called that wants to replace a stream thread and a thread that is responsible for automated scaling up is running. If we really think users might want to log the metadata of dead threads, then let's just do that for them or give them a way to do exactly that. Logging the metatdata of dead stream threads for the user is a valid alternative. Giving users the way to do exactly that is hard because the StreamThread class is not part of the public API. They would always need to call a method on the KafkaStreams object where we already have localThreadsMetadata(). I'm not that concerned about the backwards compatibility of removing dead threads from the localThreadsMetadata, because I find it hard to believe that users do anything other than just skip over them in the list (set?) that gets returned. But maybe someone can chime in with an example use case. I am also not too much concerned about backwards compatibility. That would indeed be a side effect of the current proposal. I'm actually even a little skeptical that any users might want to log the metadata of a dead thread, since all of the metadata is only useful for IQ on live threads or already covered by other easily discoverable logging elsewhere, or both. Said all of the above, I actually agree with you that there is not that much information in the metadata of a dead stream thread that is interesting. The name of the stream thread is known in the uncaught exception handler. The names of the clients, like consumer etc., used by the stream thread can be derived from the name of the stream thread. Finally, the sets of active and standby tasks should be empty for a dead stream thread. Hence, I backpedal and propose to filter out dead stream threads from localThreadsMetadata(). WDYT? On Wed, Sep 16, 2020 at 2:07 AM Bruno Cadonna wrote: Hi again, I just realized that if we filter out DEAD stream threads in localThreadsMetadata(), users cannot log the metadata of dying stream threads in the uncaught exception handler. I realized this thanks to the example Guozhang requested in the KIP. Thank you for that, Guozhang! Hence, I adapted the KIP as follows: - We do not filter out DEAD stream threads in KafkaStreams#localThreadsMetadata() - We guarantee that the metadata of the dead stream threads will be returned by KafkaStreams#localThreadsMetadata() at least until the next call to KafkaStreams#addStreamThread() or KafkaStreams#removeStreamThread() after the stream thread transited to DEAD. Besides giving users the opportunity to log the metadata of a dying stream thread in its uncaught exception handler, this guarantee makes KafkaStreams#localThreadsMetadata() completely backward compatible to the current behavior, because if KafkaStreams#addStreamThread() and KafkaStreams#removeStreamThread() are never called, KafkaStreams#localThreadsMetadata() will also return the metadata of all streams threads that have ever died which corresponds to the current behavior. - We guarantee that dead stream threads are removed from a Kafka Streams client at latest after the next call to KafkaStreams#addStreamThread() or KafkaStreams#removeStreamThread() following the transition of the stream thread to DEAD. This guarantees that the number of maintained stream threads does not grow indefinitely. Best, Bruno On 16.09.20 09:23, Bruno Cadonna wrote: Hi Guozhang, Good
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
> > We guarantee that the metadata of the dead stream threads will be > returned by KafkaStreams#localThreadsMetadata() at least until the next > call to KafkaStreams#addStreamThread() or > KafkaStreams#removeStreamThread() after the stream thread transited to > DEAD This seems kind of tricky...personally I would find it pretty odd if I queried the local thread metadata and found two threads, A (alive) and B (dead), and then called removeStreamThread() and now suddenly I have zero. Or if I call addStreamThread and now I still have two threads. Both of those results seem to indicate that only live threads "count" and are returned by localThreadsMetadata(). But in reality we do temporarily keep the dead thread, but only for the arbitrary amount of time until the next time you want to add or remove some other stream thread? That seems like a weird side effect of the add/removeStreamThread APIs. If we really think users might want to log the metadata of dead threads, then let's just do that for them or give them a way to do exactly that. I'm not that concerned about the backwards compatibility of removing dead threads from the localThreadsMetadata, because I find it hard to believe that users do anything other than just skip over them in the list (set?) that gets returned. But maybe someone can chime in with an example use case. I'm actually even a little skeptical that any users might want to log the metadata of a dead thread, since all of the metadata is only useful for IQ on live threads or already covered by other easily discoverable logging elsewhere, or both. On Wed, Sep 16, 2020 at 2:07 AM Bruno Cadonna wrote: > Hi again, > > I just realized that if we filter out DEAD stream threads in > localThreadsMetadata(), users cannot log the metadata of dying stream > threads in the uncaught exception handler. > > I realized this thanks to the example Guozhang requested in the KIP. > Thank you for that, Guozhang! > > Hence, I adapted the KIP as follows: > > - We do not filter out DEAD stream threads in > KafkaStreams#localThreadsMetadata() > > - We guarantee that the metadata of the dead stream threads will be > returned by KafkaStreams#localThreadsMetadata() at least until the next > call to KafkaStreams#addStreamThread() or > KafkaStreams#removeStreamThread() after the stream thread transited to > DEAD. Besides giving users the opportunity to log the metadata of a > dying stream thread in its uncaught exception handler, this guarantee > makes KafkaStreams#localThreadsMetadata() completely backward compatible > to the current behavior, because if KafkaStreams#addStreamThread() and > KafkaStreams#removeStreamThread() are never called, > KafkaStreams#localThreadsMetadata() will also return the metadata of all > streams threads that have ever died which corresponds to the current > behavior. > > - We guarantee that dead stream threads are removed from a Kafka Streams > client at latest after the next call to KafkaStreams#addStreamThread() > or KafkaStreams#removeStreamThread() following the transition of the > stream thread to DEAD. This guarantees that the number of maintained > stream threads does not grow indefinitely. > > > Best, > Bruno > > > > On 16.09.20 09:23, Bruno Cadonna wrote: > > Hi Guozhang, > > > > Good point! I would propose to filter out DEAD stream threads in > > localThreadsMetadata() to get consistent results that do not depend on > > timing. I will update the KIP accordingly. > > > > Best, > > Bruno > > > > On 16.09.20 06:02, Guozhang Wang wrote: > >> Thanks Bruno, your replies make sense to me. As for > >> localThreadsMetadata() itself, > >> I'd like to clarify if it would return any still-bookkept threads, or > >> would > >> it specifically filter out those DEAD threads even if they are not yet > >> removed. > >> > >> Otherwise, the KIP LGTM. > >> > >> Guozhang > >> > >> On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna > wrote: > >> > >>> Hi Guozhang, > >>> > >>> Thank you for your feedback. I replied inline. > >>> > >>> Best, > >>> Bruno > >>> > >>> On 09.09.20 23:43, Guozhang Wang wrote: > Hello Bruno, > > Finally got some time to review your KIP and the discussion thread > now.. > >>> a > few comments below: > > 1) I'm with Matthias about the newly added numberOfAliveStreamThreads > >>> v.s. > existing localThreadsMetadata: to me it seems we can always achieve > the > first based on the second. It seems not worthy to provide some "syntax > sugar" to the API but just let users do the filtering themselves. > >>> > >>> I am not married to that method. I removed it. > >>> > Furthermore, I'm wondering what's the rationale behind removing the > DEAD > threads from localThreadsMetadata()? Personally I feel returning all > threads, including those who are ever closed, either due to > exception or > due to removeStreamThread, would be beneficial for debugging > purposes, as > long as within the
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi again, I just realized that if we filter out DEAD stream threads in localThreadsMetadata(), users cannot log the metadata of dying stream threads in the uncaught exception handler. I realized this thanks to the example Guozhang requested in the KIP. Thank you for that, Guozhang! Hence, I adapted the KIP as follows: - We do not filter out DEAD stream threads in KafkaStreams#localThreadsMetadata() - We guarantee that the metadata of the dead stream threads will be returned by KafkaStreams#localThreadsMetadata() at least until the next call to KafkaStreams#addStreamThread() or KafkaStreams#removeStreamThread() after the stream thread transited to DEAD. Besides giving users the opportunity to log the metadata of a dying stream thread in its uncaught exception handler, this guarantee makes KafkaStreams#localThreadsMetadata() completely backward compatible to the current behavior, because if KafkaStreams#addStreamThread() and KafkaStreams#removeStreamThread() are never called, KafkaStreams#localThreadsMetadata() will also return the metadata of all streams threads that have ever died which corresponds to the current behavior. - We guarantee that dead stream threads are removed from a Kafka Streams client at latest after the next call to KafkaStreams#addStreamThread() or KafkaStreams#removeStreamThread() following the transition of the stream thread to DEAD. This guarantees that the number of maintained stream threads does not grow indefinitely. Best, Bruno On 16.09.20 09:23, Bruno Cadonna wrote: Hi Guozhang, Good point! I would propose to filter out DEAD stream threads in localThreadsMetadata() to get consistent results that do not depend on timing. I will update the KIP accordingly. Best, Bruno On 16.09.20 06:02, Guozhang Wang wrote: Thanks Bruno, your replies make sense to me. As for localThreadsMetadata() itself, I'd like to clarify if it would return any still-bookkept threads, or would it specifically filter out those DEAD threads even if they are not yet removed. Otherwise, the KIP LGTM. Guozhang On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna wrote: Hi Guozhang, Thank you for your feedback. I replied inline. Best, Bruno On 09.09.20 23:43, Guozhang Wang wrote: Hello Bruno, Finally got some time to review your KIP and the discussion thread now.. a few comments below: 1) I'm with Matthias about the newly added numberOfAliveStreamThreads v.s. existing localThreadsMetadata: to me it seems we can always achieve the first based on the second. It seems not worthy to provide some "syntax sugar" to the API but just let users do the filtering themselves. I am not married to that method. I removed it. Furthermore, I'm wondering what's the rationale behind removing the DEAD threads from localThreadsMetadata()? Personally I feel returning all threads, including those who are ever closed, either due to exception or due to removeStreamThread, would be beneficial for debugging purposes, as long as within the lifetime of an instance we expect the amount of such dead threads will not increase linearly --- and if we agree with that, maybe we can rename "removeStreamThread" to sth. like "terminateStreamThread" indicating it is only terminated but not removed --- and of course if users do not want to see those DEAD threads they can always filter them out. I'm just proposing that we should still leave the door open for those who want to check those ever terminated threads. I actually think the number of dead stream threads might increase linearly. Assume users have a systematic error that continuously kills a stream thread and they blindly start a new stream thread in the uncaught exception handler. This scenario might be a mistake but if the systematic error does not occur at a high rate, it could also be a strategy to keep the application running during the investigation of the systematic error. IMO, removing dead stream threads makes Kafka Streams more robust because it prevent a possibly unbounded increase of memory usage. If users want to debug the dead stream threads they can monitor the number of dead threads with the metric proposed in the KIP and they could additionally log the metadata of the dying stream thread in the uncaught exception handler. I do not think that there is need to keep dead stream threads around. 2) I think it would help to write down some example user code in exception handler e.g. to illustrate how this would be implemented -- e.g. we know that practically the handler need to maintain a "this" reference of the instance anyways in order to shutdown the whole instance or, add/terminate threads dynamically, but I want to see if we have listed all possible call paths like: a) a thread's handler logic to terminate another thread, b) a thread handler to add new threads, etc are all appropriately supported without deadlocks. I added an example for an uncaught exception handler that adds a stream thread to the
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi Guozhang, Good point! I would propose to filter out DEAD stream threads in localThreadsMetadata() to get consistent results that do not depend on timing. I will update the KIP accordingly. Best, Bruno On 16.09.20 06:02, Guozhang Wang wrote: Thanks Bruno, your replies make sense to me. As for localThreadsMetadata() itself, I'd like to clarify if it would return any still-bookkept threads, or would it specifically filter out those DEAD threads even if they are not yet removed. Otherwise, the KIP LGTM. Guozhang On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna wrote: Hi Guozhang, Thank you for your feedback. I replied inline. Best, Bruno On 09.09.20 23:43, Guozhang Wang wrote: Hello Bruno, Finally got some time to review your KIP and the discussion thread now.. a few comments below: 1) I'm with Matthias about the newly added numberOfAliveStreamThreads v.s. existing localThreadsMetadata: to me it seems we can always achieve the first based on the second. It seems not worthy to provide some "syntax sugar" to the API but just let users do the filtering themselves. I am not married to that method. I removed it. Furthermore, I'm wondering what's the rationale behind removing the DEAD threads from localThreadsMetadata()? Personally I feel returning all threads, including those who are ever closed, either due to exception or due to removeStreamThread, would be beneficial for debugging purposes, as long as within the lifetime of an instance we expect the amount of such dead threads will not increase linearly --- and if we agree with that, maybe we can rename "removeStreamThread" to sth. like "terminateStreamThread" indicating it is only terminated but not removed --- and of course if users do not want to see those DEAD threads they can always filter them out. I'm just proposing that we should still leave the door open for those who want to check those ever terminated threads. I actually think the number of dead stream threads might increase linearly. Assume users have a systematic error that continuously kills a stream thread and they blindly start a new stream thread in the uncaught exception handler. This scenario might be a mistake but if the systematic error does not occur at a high rate, it could also be a strategy to keep the application running during the investigation of the systematic error. IMO, removing dead stream threads makes Kafka Streams more robust because it prevent a possibly unbounded increase of memory usage. If users want to debug the dead stream threads they can monitor the number of dead threads with the metric proposed in the KIP and they could additionally log the metadata of the dying stream thread in the uncaught exception handler. I do not think that there is need to keep dead stream threads around. 2) I think it would help to write down some example user code in exception handler e.g. to illustrate how this would be implemented -- e.g. we know that practically the handler need to maintain a "this" reference of the instance anyways in order to shutdown the whole instance or, add/terminate threads dynamically, but I want to see if we have listed all possible call paths like: a) a thread's handler logic to terminate another thread, b) a thread handler to add new threads, etc are all appropriately supported without deadlocks. I added an example for an uncaught exception handler that adds a stream thread to the KIP. Removing a stream thread in an uncaught exception handler doesn't seem a common use case to me. Nevertheless, we need to make sure that we do not run in a deadlock in that case. I will consider that during the implementation and write tests to check for deadlocks. Shutting down the Kafka Streams client from inside an uncaught exception handler is outside the scope of this KIP. In the beginning it was part of the KIP, but during the discussion it turned out that we can fix our existing close() method to accomplish the shutdown from inside an uncaught exception handler. But I completely agree with you that we need to ensure that we do not run into a deadlock in this case. Guozhang On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax wrote: I would prefer to not add a new method. It seems unnecessary. `localThreadMetadata` does return all threads in all states(*) and thus provides full insight. (*) A thread in state DEAD could be returned as long as it's not removed yet. I don't see any advantage to pre-filter threads and to exclude threads in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not started yet, it is still "alive" in a broader sense. For example, if a user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in state CREATED, a user won't need to add 2 more threads -- there are already 10 threads. For PENDING_SHUTDOWN and scale in it would be different I guess, as the proposal would be to filter them out right away. However, filtering them seems actually not to be "correct", as a thread in
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Thanks Bruno, your replies make sense to me. As for localThreadsMetadata() itself, I'd like to clarify if it would return any still-bookkept threads, or would it specifically filter out those DEAD threads even if they are not yet removed. Otherwise, the KIP LGTM. Guozhang On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna wrote: > Hi Guozhang, > > Thank you for your feedback. I replied inline. > > Best, > Bruno > > On 09.09.20 23:43, Guozhang Wang wrote: > > Hello Bruno, > > > > Finally got some time to review your KIP and the discussion thread now.. > a > > few comments below: > > > > 1) I'm with Matthias about the newly added numberOfAliveStreamThreads > v.s. > > existing localThreadsMetadata: to me it seems we can always achieve the > > first based on the second. It seems not worthy to provide some "syntax > > sugar" to the API but just let users do the filtering themselves. > > I am not married to that method. I removed it. > > > Furthermore, I'm wondering what's the rationale behind removing the DEAD > > threads from localThreadsMetadata()? Personally I feel returning all > > threads, including those who are ever closed, either due to exception or > > due to removeStreamThread, would be beneficial for debugging purposes, as > > long as within the lifetime of an instance we expect the amount of such > > dead threads will not increase linearly --- and if we agree with that, > > maybe we can rename "removeStreamThread" to sth. like > > "terminateStreamThread" indicating it is only terminated but not removed > > --- and of course if users do not want to see those DEAD threads they can > > always filter them out. I'm just proposing that we should still leave the > > door open for those who want to check those ever terminated threads. > > > > I actually think the number of dead stream threads might increase > linearly. Assume users have a systematic error that continuously kills a > stream thread and they blindly start a new stream thread in the uncaught > exception handler. This scenario might be a mistake but if the > systematic error does not occur at a high rate, it could also be a > strategy to keep the application running during the investigation of the > systematic error. > > IMO, removing dead stream threads makes Kafka Streams more robust > because it prevent a possibly unbounded increase of memory usage. If > users want to debug the dead stream threads they can monitor the number > of dead threads with the metric proposed in the KIP and they could > additionally log the metadata of the dying stream thread in the uncaught > exception handler. I do not think that there is need to keep dead stream > threads around. > > > 2) I think it would help to write down some example user code in > exception > > handler e.g. to illustrate how this would be implemented -- e.g. we know > > that practically the handler need to maintain a "this" reference of the > > instance anyways in order to shutdown the whole instance or, > add/terminate > > threads dynamically, but I want to see if we have listed all possible > call > > paths like: a) a thread's handler logic to terminate another thread, b) a > > thread handler to add new threads, etc are all appropriately supported > > without deadlocks. > > > > I added an example for an uncaught exception handler that adds a stream > thread to the KIP. Removing a stream thread in an uncaught exception > handler doesn't seem a common use case to me. Nevertheless, we need to > make sure that we do not run in a deadlock in that case. I will consider > that during the implementation and write tests to check for deadlocks. > > Shutting down the Kafka Streams client from inside an uncaught exception > handler is outside the scope of this KIP. In the beginning it was part > of the KIP, but during the discussion it turned out that we can fix our > existing close() method to accomplish the shutdown from inside an > uncaught exception handler. But I completely agree with you that we need > to ensure that we do not run into a deadlock in this case. > > > > > > Guozhang > > > > > > On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax > wrote: > > > >> I would prefer to not add a new method. It seems unnecessary. > >> `localThreadMetadata` does return all threads in all states(*) and thus > >> provides full insight. > >> > >> (*) A thread in state DEAD could be returned as long as it's not removed > >> yet. > >> > >> I don't see any advantage to pre-filter threads and to exclude threads > >> in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not > >> started yet, it is still "alive" in a broader sense. For example, if a > >> user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in > >> state CREATED, a user won't need to add 2 more threads -- there are > >> already 10 threads. > >> > >> For PENDING_SHUTDOWN and scale in it would be different I guess, as the > >> proposal would be to filter them out right away. However, filtering them > >> seems actually not to be
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi Guozhang, Thank you for your feedback. I replied inline. Best, Bruno On 09.09.20 23:43, Guozhang Wang wrote: Hello Bruno, Finally got some time to review your KIP and the discussion thread now.. a few comments below: 1) I'm with Matthias about the newly added numberOfAliveStreamThreads v.s. existing localThreadsMetadata: to me it seems we can always achieve the first based on the second. It seems not worthy to provide some "syntax sugar" to the API but just let users do the filtering themselves. I am not married to that method. I removed it. Furthermore, I'm wondering what's the rationale behind removing the DEAD threads from localThreadsMetadata()? Personally I feel returning all threads, including those who are ever closed, either due to exception or due to removeStreamThread, would be beneficial for debugging purposes, as long as within the lifetime of an instance we expect the amount of such dead threads will not increase linearly --- and if we agree with that, maybe we can rename "removeStreamThread" to sth. like "terminateStreamThread" indicating it is only terminated but not removed --- and of course if users do not want to see those DEAD threads they can always filter them out. I'm just proposing that we should still leave the door open for those who want to check those ever terminated threads. I actually think the number of dead stream threads might increase linearly. Assume users have a systematic error that continuously kills a stream thread and they blindly start a new stream thread in the uncaught exception handler. This scenario might be a mistake but if the systematic error does not occur at a high rate, it could also be a strategy to keep the application running during the investigation of the systematic error. IMO, removing dead stream threads makes Kafka Streams more robust because it prevent a possibly unbounded increase of memory usage. If users want to debug the dead stream threads they can monitor the number of dead threads with the metric proposed in the KIP and they could additionally log the metadata of the dying stream thread in the uncaught exception handler. I do not think that there is need to keep dead stream threads around. 2) I think it would help to write down some example user code in exception handler e.g. to illustrate how this would be implemented -- e.g. we know that practically the handler need to maintain a "this" reference of the instance anyways in order to shutdown the whole instance or, add/terminate threads dynamically, but I want to see if we have listed all possible call paths like: a) a thread's handler logic to terminate another thread, b) a thread handler to add new threads, etc are all appropriately supported without deadlocks. I added an example for an uncaught exception handler that adds a stream thread to the KIP. Removing a stream thread in an uncaught exception handler doesn't seem a common use case to me. Nevertheless, we need to make sure that we do not run in a deadlock in that case. I will consider that during the implementation and write tests to check for deadlocks. Shutting down the Kafka Streams client from inside an uncaught exception handler is outside the scope of this KIP. In the beginning it was part of the KIP, but during the discussion it turned out that we can fix our existing close() method to accomplish the shutdown from inside an uncaught exception handler. But I completely agree with you that we need to ensure that we do not run into a deadlock in this case. Guozhang On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax wrote: I would prefer to not add a new method. It seems unnecessary. `localThreadMetadata` does return all threads in all states(*) and thus provides full insight. (*) A thread in state DEAD could be returned as long as it's not removed yet. I don't see any advantage to pre-filter threads and to exclude threads in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not started yet, it is still "alive" in a broader sense. For example, if a user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in state CREATED, a user won't need to add 2 more threads -- there are already 10 threads. For PENDING_SHUTDOWN and scale in it would be different I guess, as the proposal would be to filter them out right away. However, filtering them seems actually not to be "correct", as a thread in PENDING_SHUTDOWN might still process data and it's thus still "alive". If there is still a need later to add a new method about "alive thread" we can always add as a follow up -- removing things is much harder. I also don't think that there is value in returning names of dead threads, as we recycle names. -Matthias On 9/9/20 10:04 AM, Sophie Blee-Goldman wrote: I agree that the current behavior of localThreadsMetadata() does not seem to match, but it seems like we will be forced to change it to only return currently-alive threads. For one thing, we plan to recycle
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hello Bruno, Finally got some time to review your KIP and the discussion thread now.. a few comments below: 1) I'm with Matthias about the newly added numberOfAliveStreamThreads v.s. existing localThreadsMetadata: to me it seems we can always achieve the first based on the second. It seems not worthy to provide some "syntax sugar" to the API but just let users do the filtering themselves. Furthermore, I'm wondering what's the rationale behind removing the DEAD threads from localThreadsMetadata()? Personally I feel returning all threads, including those who are ever closed, either due to exception or due to removeStreamThread, would be beneficial for debugging purposes, as long as within the lifetime of an instance we expect the amount of such dead threads will not increase linearly --- and if we agree with that, maybe we can rename "removeStreamThread" to sth. like "terminateStreamThread" indicating it is only terminated but not removed --- and of course if users do not want to see those DEAD threads they can always filter them out. I'm just proposing that we should still leave the door open for those who want to check those ever terminated threads. 2) I think it would help to write down some example user code in exception handler e.g. to illustrate how this would be implemented -- e.g. we know that practically the handler need to maintain a "this" reference of the instance anyways in order to shutdown the whole instance or, add/terminate threads dynamically, but I want to see if we have listed all possible call paths like: a) a thread's handler logic to terminate another thread, b) a thread handler to add new threads, etc are all appropriately supported without deadlocks. Guozhang On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax wrote: > I would prefer to not add a new method. It seems unnecessary. > `localThreadMetadata` does return all threads in all states(*) and thus > provides full insight. > > (*) A thread in state DEAD could be returned as long as it's not removed > yet. > > I don't see any advantage to pre-filter threads and to exclude threads > in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not > started yet, it is still "alive" in a broader sense. For example, if a > user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in > state CREATED, a user won't need to add 2 more threads -- there are > already 10 threads. > > For PENDING_SHUTDOWN and scale in it would be different I guess, as the > proposal would be to filter them out right away. However, filtering them > seems actually not to be "correct", as a thread in PENDING_SHUTDOWN > might still process data and it's thus still "alive". > > If there is still a need later to add a new method about "alive thread" > we can always add as a follow up -- removing things is much harder. > > I also don't think that there is value in returning names of dead > threads, as we recycle names. > > > -Matthias > > > On 9/9/20 10:04 AM, Sophie Blee-Goldman wrote: > > I agree that the current behavior of localThreadsMetadata() does not seem > > to match, but it seems like we will be forced to change it to only return > > currently-alive threads. For one thing, we plan to recycle old thread > names. > > It would be pretty confusing for a user to get two (or more) > ThreadMetadata > > objects returned with the same name, since AFAICT this is the only > > distinguishing identifier of stream threads. I think we should enforce > that > > only live threads are returned by localThreadsMetadata(). Plus, as > Matthias > > pointed out, we plan to remove dead threads from the KafkaStreams client, > > so still returning them in the metadata would be extremely odd. > > > > If we think that there might be some use case that requires knowing which > > threads have died, we could consider adding a method that returns the > > names of dead threads. But the only use case I can imagine would probably > > be better served by a callback that gets invoked when the thread dies, > which > > we already have. > > > > On Tue, Sep 8, 2020 at 11:46 PM Bruno Cadonna > wrote: > > > >> Hi Matthias and Sophie, > >> > >> I agree that localThreadsMetadata() can be used here. However, > >> localThreadsMetadata() returns all stream threads irrespectively of > >> their states. Alive stream threads are specified as being in one of the > >> following states: RUNNING, STARTING, PARTITIONS_REVOKED, and > >> PARTITIONS_ASSIGNED. Hence, users would need to filter the result of > >> localThreadsMetadata(). I thought, it would be neat to have a method > >> that hides this filtering and returns the number of alive stream > >> threads, because that is the most basic information you might need to > >> decide about adding or removing stream threads. For all more advanced > >> use cases users should use localThreadsMetadata(). I am also happy with > >> removing the method. WDYT? > >> > >> Best, > >> Bruno > >> > >> On 09.09.20 03:51, Matthias J. Sax wrote: > >>> Currently
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
I would prefer to not add a new method. It seems unnecessary. `localThreadMetadata` does return all threads in all states(*) and thus provides full insight. (*) A thread in state DEAD could be returned as long as it's not removed yet. I don't see any advantage to pre-filter threads and to exclude threads in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not started yet, it is still "alive" in a broader sense. For example, if a user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in state CREATED, a user won't need to add 2 more threads -- there are already 10 threads. For PENDING_SHUTDOWN and scale in it would be different I guess, as the proposal would be to filter them out right away. However, filtering them seems actually not to be "correct", as a thread in PENDING_SHUTDOWN might still process data and it's thus still "alive". If there is still a need later to add a new method about "alive thread" we can always add as a follow up -- removing things is much harder. I also don't think that there is value in returning names of dead threads, as we recycle names. -Matthias On 9/9/20 10:04 AM, Sophie Blee-Goldman wrote: > I agree that the current behavior of localThreadsMetadata() does not seem > to match, but it seems like we will be forced to change it to only return > currently-alive threads. For one thing, we plan to recycle old thread names. > It would be pretty confusing for a user to get two (or more) ThreadMetadata > objects returned with the same name, since AFAICT this is the only > distinguishing identifier of stream threads. I think we should enforce that > only live threads are returned by localThreadsMetadata(). Plus, as Matthias > pointed out, we plan to remove dead threads from the KafkaStreams client, > so still returning them in the metadata would be extremely odd. > > If we think that there might be some use case that requires knowing which > threads have died, we could consider adding a method that returns the > names of dead threads. But the only use case I can imagine would probably > be better served by a callback that gets invoked when the thread dies, which > we already have. > > On Tue, Sep 8, 2020 at 11:46 PM Bruno Cadonna wrote: > >> Hi Matthias and Sophie, >> >> I agree that localThreadsMetadata() can be used here. However, >> localThreadsMetadata() returns all stream threads irrespectively of >> their states. Alive stream threads are specified as being in one of the >> following states: RUNNING, STARTING, PARTITIONS_REVOKED, and >> PARTITIONS_ASSIGNED. Hence, users would need to filter the result of >> localThreadsMetadata(). I thought, it would be neat to have a method >> that hides this filtering and returns the number of alive stream >> threads, because that is the most basic information you might need to >> decide about adding or removing stream threads. For all more advanced >> use cases users should use localThreadsMetadata(). I am also happy with >> removing the method. WDYT? >> >> Best, >> Bruno >> >> On 09.09.20 03:51, Matthias J. Sax wrote: >>> Currently we, don't cleanup dead threads, but the KIP proposes to change >>> this: >>> Stream threads that are in state DEAD will be removed from the stream >> threads of a Kafka Streams client. >>> >>> >>> -Matthias >>> >>> On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote: Ah, I forgot about localThreadsMetadata(). In that. case I agree, >> there's no reason to introduce a new method when we can get both the names and number of >> all running threads from this. I assume that we would update localThreadsMetadata to only return >> currently alive threads as part of this KIP -- at a quick glance, it seems like we don't do any pruning of dead threads at the moment On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax >> wrote: > I am not sure if we need a new method? There is already > `localThreadsMetadata()`. What do we gain by adding a new one? > > Returning the thread's name (as `Optional`) for both add() and > remove() is fine with me. > > > -Matthias > > On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote: >> Sorry Bruno, I think I missed the end of your message with the >> numberOfAliveStreamThreads() >> proposal. I agree, that would be better than the alternatives I >> listed. >> That said: >> >>> They rather suggest that the method returns a list of handles to the >> stream threads. >> >> I hadn't thought of that originally, but now that you mention it, this >> might be a good idea. >> I don't think we should return actual handles on the threads, but >> maybe a >> list of the thread >> names rather than a single number of currently alive threads. >> >> Since we seem to think it would be difficult if not impossible to keep >> track of the number >> of running stream threads, we should apply the same reasoning to the > names
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
I agree that the current behavior of localThreadsMetadata() does not seem to match, but it seems like we will be forced to change it to only return currently-alive threads. For one thing, we plan to recycle old thread names. It would be pretty confusing for a user to get two (or more) ThreadMetadata objects returned with the same name, since AFAICT this is the only distinguishing identifier of stream threads. I think we should enforce that only live threads are returned by localThreadsMetadata(). Plus, as Matthias pointed out, we plan to remove dead threads from the KafkaStreams client, so still returning them in the metadata would be extremely odd. If we think that there might be some use case that requires knowing which threads have died, we could consider adding a method that returns the names of dead threads. But the only use case I can imagine would probably be better served by a callback that gets invoked when the thread dies, which we already have. On Tue, Sep 8, 2020 at 11:46 PM Bruno Cadonna wrote: > Hi Matthias and Sophie, > > I agree that localThreadsMetadata() can be used here. However, > localThreadsMetadata() returns all stream threads irrespectively of > their states. Alive stream threads are specified as being in one of the > following states: RUNNING, STARTING, PARTITIONS_REVOKED, and > PARTITIONS_ASSIGNED. Hence, users would need to filter the result of > localThreadsMetadata(). I thought, it would be neat to have a method > that hides this filtering and returns the number of alive stream > threads, because that is the most basic information you might need to > decide about adding or removing stream threads. For all more advanced > use cases users should use localThreadsMetadata(). I am also happy with > removing the method. WDYT? > > Best, > Bruno > > On 09.09.20 03:51, Matthias J. Sax wrote: > > Currently we, don't cleanup dead threads, but the KIP proposes to change > > this: > > > >> Stream threads that are in state DEAD will be removed from the stream > threads of a Kafka Streams client. > > > > > > -Matthias > > > > On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote: > >> Ah, I forgot about localThreadsMetadata(). In that. case I agree, > there's > >> no reason > >> to introduce a new method when we can get both the names and number of > all > >> running threads from this. > >> > >> I assume that we would update localThreadsMetadata to only return > currently > >> alive threads as part of this KIP -- at a quick glance, it seems like we > >> don't do > >> any pruning of dead threads at the moment > >> > >> On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax > wrote: > >> > >>> I am not sure if we need a new method? There is already > >>> `localThreadsMetadata()`. What do we gain by adding a new one? > >>> > >>> Returning the thread's name (as `Optional`) for both add() and > >>> remove() is fine with me. > >>> > >>> > >>> -Matthias > >>> > >>> On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote: > Sorry Bruno, I think I missed the end of your message with the > numberOfAliveStreamThreads() > proposal. I agree, that would be better than the alternatives I > listed. > That said: > > > They rather suggest that the method returns a list of handles to the > stream threads. > > I hadn't thought of that originally, but now that you mention it, this > might be a good idea. > I don't think we should return actual handles on the threads, but > maybe a > list of the thread > names rather than a single number of currently alive threads. > > Since we seem to think it would be difficult if not impossible to keep > track of the number > of running stream threads, we should apply the same reasoning to the > >>> names > and not > assume the user can/will keep track of every thread returned by > addStreamThread() or > removeStreamThread(). Users should generally take any required action > immediately > after adding/removing the thread -- eg deregistering the thread > metrics > >>> -- > but it might > still be useful to provide a convenience method listing all of the > >>> current > threads > > And of course you could still get the number of threads easily by > >>> invoking > size() on the > returned list (or ordered set?). > > On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna > >>> wrote: > > > Thank you again for the feedback Sophie! > > > > As I tried to point out in my previous e-mail, removing a stream > thread > > from a Kafka Streams client that does not have alive stream threads > is > > nothing exceptional for the client per se. However, it can become > > exceptional within the context of the user. For example, if users > want > > to remove a stream thread from a client without alive stream threads > > because one if their metrics say so, then this is exceptional in the > > context of that user metric not in
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi Matthias and Sophie, I agree that localThreadsMetadata() can be used here. However, localThreadsMetadata() returns all stream threads irrespectively of their states. Alive stream threads are specified as being in one of the following states: RUNNING, STARTING, PARTITIONS_REVOKED, and PARTITIONS_ASSIGNED. Hence, users would need to filter the result of localThreadsMetadata(). I thought, it would be neat to have a method that hides this filtering and returns the number of alive stream threads, because that is the most basic information you might need to decide about adding or removing stream threads. For all more advanced use cases users should use localThreadsMetadata(). I am also happy with removing the method. WDYT? Best, Bruno On 09.09.20 03:51, Matthias J. Sax wrote: Currently we, don't cleanup dead threads, but the KIP proposes to change this: Stream threads that are in state DEAD will be removed from the stream threads of a Kafka Streams client. -Matthias On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote: Ah, I forgot about localThreadsMetadata(). In that. case I agree, there's no reason to introduce a new method when we can get both the names and number of all running threads from this. I assume that we would update localThreadsMetadata to only return currently alive threads as part of this KIP -- at a quick glance, it seems like we don't do any pruning of dead threads at the moment On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax wrote: I am not sure if we need a new method? There is already `localThreadsMetadata()`. What do we gain by adding a new one? Returning the thread's name (as `Optional`) for both add() and remove() is fine with me. -Matthias On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote: Sorry Bruno, I think I missed the end of your message with the numberOfAliveStreamThreads() proposal. I agree, that would be better than the alternatives I listed. That said: They rather suggest that the method returns a list of handles to the stream threads. I hadn't thought of that originally, but now that you mention it, this might be a good idea. I don't think we should return actual handles on the threads, but maybe a list of the thread names rather than a single number of currently alive threads. Since we seem to think it would be difficult if not impossible to keep track of the number of running stream threads, we should apply the same reasoning to the names and not assume the user can/will keep track of every thread returned by addStreamThread() or removeStreamThread(). Users should generally take any required action immediately after adding/removing the thread -- eg deregistering the thread metrics -- but it might still be useful to provide a convenience method listing all of the current threads And of course you could still get the number of threads easily by invoking size() on the returned list (or ordered set?). On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna wrote: Thank you again for the feedback Sophie! As I tried to point out in my previous e-mail, removing a stream thread from a Kafka Streams client that does not have alive stream threads is nothing exceptional for the client per se. However, it can become exceptional within the context of the user. For example, if users want to remove a stream thread from a client without alive stream threads because one if their metrics say so, then this is exceptional in the context of that user metric not in the context of the Kafka Streams client. In that case, users should throw an exception and handle it. Regarding returning null, I do not like to return null because from a development point of view there is no distinction between returning null because we have a bug in the code or returning null because there are no alive stream threads. Additionally, Optional makes it more explicit that the result could also be empty. Thank you for the alternative method names! However, with the names you propose it is not immediately clear that the method returns an amount of stream threads. They rather suggest that the method returns a list of handles to the stream threads. I chose to use "aliveStreamThreads" to be consistent with the client-level metric "alive-stream-threads" which reports the same number of stream threads that numberOfAliveStreamThreads() should report. If others also think that the proposed name in the KIP is too clumsy, I am open to rename it, though. Best, Bruno On 08.09.20 20:12, Sophie Blee-Goldman wrote: it's never a good sign when the discussion moves into the vote thread Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads strikes again. Thanks for redirecting me Bruno I suppose it's unfair to expect the callers to keep perfect track of the current number of stream threads, but it also seems like you shouldn't be calling removeStreamThread() when there are no threads left. Either you're just haphazardly removing threads and could unintentionally slip
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Currently we, don't cleanup dead threads, but the KIP proposes to change this: > Stream threads that are in state DEAD will be removed from the stream threads > of a Kafka Streams client. -Matthias On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote: > Ah, I forgot about localThreadsMetadata(). In that. case I agree, there's > no reason > to introduce a new method when we can get both the names and number of all > running threads from this. > > I assume that we would update localThreadsMetadata to only return currently > alive threads as part of this KIP -- at a quick glance, it seems like we > don't do > any pruning of dead threads at the moment > > On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax wrote: > >> I am not sure if we need a new method? There is already >> `localThreadsMetadata()`. What do we gain by adding a new one? >> >> Returning the thread's name (as `Optional`) for both add() and >> remove() is fine with me. >> >> >> -Matthias >> >> On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote: >>> Sorry Bruno, I think I missed the end of your message with the >>> numberOfAliveStreamThreads() >>> proposal. I agree, that would be better than the alternatives I listed. >>> That said: >>> They rather suggest that the method returns a list of handles to the >>> stream threads. >>> >>> I hadn't thought of that originally, but now that you mention it, this >>> might be a good idea. >>> I don't think we should return actual handles on the threads, but maybe a >>> list of the thread >>> names rather than a single number of currently alive threads. >>> >>> Since we seem to think it would be difficult if not impossible to keep >>> track of the number >>> of running stream threads, we should apply the same reasoning to the >> names >>> and not >>> assume the user can/will keep track of every thread returned by >>> addStreamThread() or >>> removeStreamThread(). Users should generally take any required action >>> immediately >>> after adding/removing the thread -- eg deregistering the thread metrics >> -- >>> but it might >>> still be useful to provide a convenience method listing all of the >> current >>> threads >>> >>> And of course you could still get the number of threads easily by >> invoking >>> size() on the >>> returned list (or ordered set?). >>> >>> On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna >> wrote: >>> Thank you again for the feedback Sophie! As I tried to point out in my previous e-mail, removing a stream thread from a Kafka Streams client that does not have alive stream threads is nothing exceptional for the client per se. However, it can become exceptional within the context of the user. For example, if users want to remove a stream thread from a client without alive stream threads because one if their metrics say so, then this is exceptional in the context of that user metric not in the context of the Kafka Streams client. In that case, users should throw an exception and handle it. Regarding returning null, I do not like to return null because from a development point of view there is no distinction between returning null because we have a bug in the code or returning null because there are no alive stream threads. Additionally, Optional makes it more explicit that the result could also be empty. Thank you for the alternative method names! However, with the names you propose it is not immediately clear that the method returns an amount of stream threads. They rather suggest that the method returns a list of handles to the stream threads. I chose to use "aliveStreamThreads" to be consistent with the client-level metric "alive-stream-threads" which reports the same number of stream threads that numberOfAliveStreamThreads() should report. If others also think that the proposed name in the KIP is too clumsy, I am open to rename it, >> though. Best, Bruno On 08.09.20 20:12, Sophie Blee-Goldman wrote: >> it's never a good sign when the discussion moves into the vote thread > > Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads strikes > again. > Thanks for redirecting me Bruno > > I suppose it's unfair to expect the callers to keep perfect track of >> the > current > number of stream threads, but it also seems like you shouldn't be calling > removeStreamThread() when there are no threads left. Either you're just > haphazardly removing threads and could unintentionally slip into a >> state of > no > running threads without realizing it, or more realistically, you're > carefully > removing threads based on some metric(s) that convey whether the system is > over or under-provisioned. If your metrics say you're over-provisioned but > there's > not one thread running, well, that certainly sounds exceptional to me. >> Or > you
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Ah, I forgot about localThreadsMetadata(). In that. case I agree, there's no reason to introduce a new method when we can get both the names and number of all running threads from this. I assume that we would update localThreadsMetadata to only return currently alive threads as part of this KIP -- at a quick glance, it seems like we don't do any pruning of dead threads at the moment On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax wrote: > I am not sure if we need a new method? There is already > `localThreadsMetadata()`. What do we gain by adding a new one? > > Returning the thread's name (as `Optional`) for both add() and > remove() is fine with me. > > > -Matthias > > On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote: > > Sorry Bruno, I think I missed the end of your message with the > > numberOfAliveStreamThreads() > > proposal. I agree, that would be better than the alternatives I listed. > > That said: > > > >> They rather suggest that the method returns a list of handles to the > > stream threads. > > > > I hadn't thought of that originally, but now that you mention it, this > > might be a good idea. > > I don't think we should return actual handles on the threads, but maybe a > > list of the thread > > names rather than a single number of currently alive threads. > > > > Since we seem to think it would be difficult if not impossible to keep > > track of the number > > of running stream threads, we should apply the same reasoning to the > names > > and not > > assume the user can/will keep track of every thread returned by > > addStreamThread() or > > removeStreamThread(). Users should generally take any required action > > immediately > > after adding/removing the thread -- eg deregistering the thread metrics > -- > > but it might > > still be useful to provide a convenience method listing all of the > current > > threads > > > > And of course you could still get the number of threads easily by > invoking > > size() on the > > returned list (or ordered set?). > > > > On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna > wrote: > > > >> Thank you again for the feedback Sophie! > >> > >> As I tried to point out in my previous e-mail, removing a stream thread > >> from a Kafka Streams client that does not have alive stream threads is > >> nothing exceptional for the client per se. However, it can become > >> exceptional within the context of the user. For example, if users want > >> to remove a stream thread from a client without alive stream threads > >> because one if their metrics say so, then this is exceptional in the > >> context of that user metric not in the context of the Kafka Streams > >> client. In that case, users should throw an exception and handle it. > >> > >> Regarding returning null, I do not like to return null because from a > >> development point of view there is no distinction between returning null > >> because we have a bug in the code or returning null because there are no > >> alive stream threads. Additionally, Optional makes it more > >> explicit that the result could also be empty. > >> > >> Thank you for the alternative method names! However, with the names you > >> propose it is not immediately clear that the method returns an amount of > >> stream threads. They rather suggest that the method returns a list of > >> handles to the stream threads. I chose to use "aliveStreamThreads" to be > >> consistent with the client-level metric "alive-stream-threads" which > >> reports the same number of stream threads that > >> numberOfAliveStreamThreads() should report. If others also think that > >> the proposed name in the KIP is too clumsy, I am open to rename it, > though. > >> > >> Best, > >> Bruno > >> > >> > >> On 08.09.20 20:12, Sophie Blee-Goldman wrote: > it's never a good sign when the discussion moves into the vote thread > >>> > >>> Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads > >> strikes > >>> again. > >>> Thanks for redirecting me Bruno > >>> > >>> I suppose it's unfair to expect the callers to keep perfect track of > the > >>> current > >>> number of stream threads, but it also seems like you shouldn't be > >> calling > >>> removeStreamThread() when there are no threads left. Either you're just > >>> haphazardly removing threads and could unintentionally slip into a > state > >> of > >>> no > >>> running threads without realizing it, or more realistically, you're > >>> carefully > >>> removing threads based on some metric(s) that convey whether the system > >> is > >>> over or under-provisioned. If your metrics say you're over-provisioned > >> but > >>> there's > >>> not one thread running, well, that certainly sounds exceptional to me. > Or > >>> you might > >>> be right in that the cluster is over-provisioned but have just been > >>> directing the > >>> removeStreamThread() and addStreamThread() calls to instances at > random, > >> and > >>> end up with one massive instance and one with no threads at all. Again, > >>> this > >>> probably
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
I am not sure if we need a new method? There is already `localThreadsMetadata()`. What do we gain by adding a new one? Returning the thread's name (as `Optional`) for both add() and remove() is fine with me. -Matthias On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote: > Sorry Bruno, I think I missed the end of your message with the > numberOfAliveStreamThreads() > proposal. I agree, that would be better than the alternatives I listed. > That said: > >> They rather suggest that the method returns a list of handles to the > stream threads. > > I hadn't thought of that originally, but now that you mention it, this > might be a good idea. > I don't think we should return actual handles on the threads, but maybe a > list of the thread > names rather than a single number of currently alive threads. > > Since we seem to think it would be difficult if not impossible to keep > track of the number > of running stream threads, we should apply the same reasoning to the names > and not > assume the user can/will keep track of every thread returned by > addStreamThread() or > removeStreamThread(). Users should generally take any required action > immediately > after adding/removing the thread -- eg deregistering the thread metrics -- > but it might > still be useful to provide a convenience method listing all of the current > threads > > And of course you could still get the number of threads easily by invoking > size() on the > returned list (or ordered set?). > > On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna wrote: > >> Thank you again for the feedback Sophie! >> >> As I tried to point out in my previous e-mail, removing a stream thread >> from a Kafka Streams client that does not have alive stream threads is >> nothing exceptional for the client per se. However, it can become >> exceptional within the context of the user. For example, if users want >> to remove a stream thread from a client without alive stream threads >> because one if their metrics say so, then this is exceptional in the >> context of that user metric not in the context of the Kafka Streams >> client. In that case, users should throw an exception and handle it. >> >> Regarding returning null, I do not like to return null because from a >> development point of view there is no distinction between returning null >> because we have a bug in the code or returning null because there are no >> alive stream threads. Additionally, Optional makes it more >> explicit that the result could also be empty. >> >> Thank you for the alternative method names! However, with the names you >> propose it is not immediately clear that the method returns an amount of >> stream threads. They rather suggest that the method returns a list of >> handles to the stream threads. I chose to use "aliveStreamThreads" to be >> consistent with the client-level metric "alive-stream-threads" which >> reports the same number of stream threads that >> numberOfAliveStreamThreads() should report. If others also think that >> the proposed name in the KIP is too clumsy, I am open to rename it, though. >> >> Best, >> Bruno >> >> >> On 08.09.20 20:12, Sophie Blee-Goldman wrote: it's never a good sign when the discussion moves into the vote thread >>> >>> Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads >> strikes >>> again. >>> Thanks for redirecting me Bruno >>> >>> I suppose it's unfair to expect the callers to keep perfect track of the >>> current >>> number of stream threads, but it also seems like you shouldn't be >> calling >>> removeStreamThread() when there are no threads left. Either you're just >>> haphazardly removing threads and could unintentionally slip into a state >> of >>> no >>> running threads without realizing it, or more realistically, you're >>> carefully >>> removing threads based on some metric(s) that convey whether the system >> is >>> over or under-provisioned. If your metrics say you're over-provisioned >> but >>> there's >>> not one thread running, well, that certainly sounds exceptional to me. Or >>> you might >>> be right in that the cluster is over-provisioned but have just been >>> directing the >>> removeStreamThread() and addStreamThread() calls to instances at random, >> and >>> end up with one massive instance and one with no threads at all. Again, >>> this >>> probably merits some human intervention (or system redesign) >>> >>> That said, I don't think there's any real harm to just returning null in >>> this case, but I hope >>> that users would pay attention to this since it seems likely to indicate >>> something has gone >>> seriously wrong. I suppose Optional would be a reasonable >>> compromise. >>> >>> As for the method name, what about activeStreamThreads() or >>> liveStreamThreads() ? >>> >>> On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna wrote: >>> Hi John, I agree with you except for checking null. I would rather prefer to use Optional as the return type to both methods. I changed the
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Sorry Bruno, I think I missed the end of your message with the numberOfAliveStreamThreads() proposal. I agree, that would be better than the alternatives I listed. That said: > They rather suggest that the method returns a list of handles to the stream threads. I hadn't thought of that originally, but now that you mention it, this might be a good idea. I don't think we should return actual handles on the threads, but maybe a list of the thread names rather than a single number of currently alive threads. Since we seem to think it would be difficult if not impossible to keep track of the number of running stream threads, we should apply the same reasoning to the names and not assume the user can/will keep track of every thread returned by addStreamThread() or removeStreamThread(). Users should generally take any required action immediately after adding/removing the thread -- eg deregistering the thread metrics -- but it might still be useful to provide a convenience method listing all of the current threads And of course you could still get the number of threads easily by invoking size() on the returned list (or ordered set?). On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna wrote: > Thank you again for the feedback Sophie! > > As I tried to point out in my previous e-mail, removing a stream thread > from a Kafka Streams client that does not have alive stream threads is > nothing exceptional for the client per se. However, it can become > exceptional within the context of the user. For example, if users want > to remove a stream thread from a client without alive stream threads > because one if their metrics say so, then this is exceptional in the > context of that user metric not in the context of the Kafka Streams > client. In that case, users should throw an exception and handle it. > > Regarding returning null, I do not like to return null because from a > development point of view there is no distinction between returning null > because we have a bug in the code or returning null because there are no > alive stream threads. Additionally, Optional makes it more > explicit that the result could also be empty. > > Thank you for the alternative method names! However, with the names you > propose it is not immediately clear that the method returns an amount of > stream threads. They rather suggest that the method returns a list of > handles to the stream threads. I chose to use "aliveStreamThreads" to be > consistent with the client-level metric "alive-stream-threads" which > reports the same number of stream threads that > numberOfAliveStreamThreads() should report. If others also think that > the proposed name in the KIP is too clumsy, I am open to rename it, though. > > Best, > Bruno > > > On 08.09.20 20:12, Sophie Blee-Goldman wrote: > >> it's never a good sign when the discussion moves into the vote thread > > > > Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads > strikes > > again. > > Thanks for redirecting me Bruno > > > > I suppose it's unfair to expect the callers to keep perfect track of the > > current > > number of stream threads, but it also seems like you shouldn't be > calling > > removeStreamThread() when there are no threads left. Either you're just > > haphazardly removing threads and could unintentionally slip into a state > of > > no > > running threads without realizing it, or more realistically, you're > > carefully > > removing threads based on some metric(s) that convey whether the system > is > > over or under-provisioned. If your metrics say you're over-provisioned > but > > there's > > not one thread running, well, that certainly sounds exceptional to me. Or > > you might > > be right in that the cluster is over-provisioned but have just been > > directing the > > removeStreamThread() and addStreamThread() calls to instances at random, > and > > end up with one massive instance and one with no threads at all. Again, > > this > > probably merits some human intervention (or system redesign) > > > > That said, I don't think there's any real harm to just returning null in > > this case, but I hope > > that users would pay attention to this since it seems likely to indicate > > something has gone > > seriously wrong. I suppose Optional would be a reasonable > > compromise. > > > > As for the method name, what about activeStreamThreads() or > > liveStreamThreads() ? > > > > On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna wrote: > > > >> Hi John, > >> > >> I agree with you except for checking null. I would rather prefer to use > >> Optional as the return type to both methods. > >> > >> I changed the subject from [VOTE] to [DISCUSS] so that we can follow up > >> in the discussion thread. > >> > >> Best, > >> Bruno > >> > >> On 04.09.20 23:12, John Roesler wrote: > >>> Hi Sophie, > >>> > >>> Uh, oh, it's never a good sign when the discussion moves > >>> into the vote thread :) > >>> > >>> I agree with you, it seems like a good touch for > >>> removeStreamThread()
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Thank you again for the feedback Sophie! As I tried to point out in my previous e-mail, removing a stream thread from a Kafka Streams client that does not have alive stream threads is nothing exceptional for the client per se. However, it can become exceptional within the context of the user. For example, if users want to remove a stream thread from a client without alive stream threads because one if their metrics say so, then this is exceptional in the context of that user metric not in the context of the Kafka Streams client. In that case, users should throw an exception and handle it. Regarding returning null, I do not like to return null because from a development point of view there is no distinction between returning null because we have a bug in the code or returning null because there are no alive stream threads. Additionally, Optional makes it more explicit that the result could also be empty. Thank you for the alternative method names! However, with the names you propose it is not immediately clear that the method returns an amount of stream threads. They rather suggest that the method returns a list of handles to the stream threads. I chose to use "aliveStreamThreads" to be consistent with the client-level metric "alive-stream-threads" which reports the same number of stream threads that numberOfAliveStreamThreads() should report. If others also think that the proposed name in the KIP is too clumsy, I am open to rename it, though. Best, Bruno On 08.09.20 20:12, Sophie Blee-Goldman wrote: it's never a good sign when the discussion moves into the vote thread Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads strikes again. Thanks for redirecting me Bruno I suppose it's unfair to expect the callers to keep perfect track of the current number of stream threads, but it also seems like you shouldn't be calling removeStreamThread() when there are no threads left. Either you're just haphazardly removing threads and could unintentionally slip into a state of no running threads without realizing it, or more realistically, you're carefully removing threads based on some metric(s) that convey whether the system is over or under-provisioned. If your metrics say you're over-provisioned but there's not one thread running, well, that certainly sounds exceptional to me. Or you might be right in that the cluster is over-provisioned but have just been directing the removeStreamThread() and addStreamThread() calls to instances at random, and end up with one massive instance and one with no threads at all. Again, this probably merits some human intervention (or system redesign) That said, I don't think there's any real harm to just returning null in this case, but I hope that users would pay attention to this since it seems likely to indicate something has gone seriously wrong. I suppose Optional would be a reasonable compromise. As for the method name, what about activeStreamThreads() or liveStreamThreads() ? On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna wrote: Hi John, I agree with you except for checking null. I would rather prefer to use Optional as the return type to both methods. I changed the subject from [VOTE] to [DISCUSS] so that we can follow up in the discussion thread. Best, Bruno On 04.09.20 23:12, John Roesler wrote: Hi Sophie, Uh, oh, it's never a good sign when the discussion moves into the vote thread :) I agree with you, it seems like a good touch for removeStreamThread() to return the name of the thread that got removed, rather than a boolean flag. Maybe the return value would be `null` if there is no thread to remove. If we go that way, I'd suggest that addStreamThread() also return the name of the newly created thread, or null if no thread can be created right now. I'm not completely sure if I think that callers of this method would know exactly how many threads there are. Sure, if a human being is sitting there looking at the metrics or logs and decides to call the method, it would work out, but I'd expect this kind of method to find its way into automated tooling that reacts to things like current system load or resource saturation. Those kinds of toolchains often are part of a distributed system, and it's probably not that easy to guarantee that the thread count they observe is fully consistent with the number of threads that are actually running. Therefore, an in-situ `int numStreamThreads()` method might not be a bad idea. Then again, it seems sort of optional. A caller can catch an exception or react to a `null` return value just the same either way. Having both add/remove methods behave similarly is probably more valuable. Thanks, -John On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman wrote: Hey, sorry for the late reply, I just have one minor suggestion. Since we don't make any guarantees about which thread gets removed or allow the user to specify, I think we should return either the index or full name of
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
> it's never a good sign when the discussion moves into the vote thread Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads strikes again. Thanks for redirecting me Bruno I suppose it's unfair to expect the callers to keep perfect track of the current number of stream threads, but it also seems like you shouldn't be calling removeStreamThread() when there are no threads left. Either you're just haphazardly removing threads and could unintentionally slip into a state of no running threads without realizing it, or more realistically, you're carefully removing threads based on some metric(s) that convey whether the system is over or under-provisioned. If your metrics say you're over-provisioned but there's not one thread running, well, that certainly sounds exceptional to me. Or you might be right in that the cluster is over-provisioned but have just been directing the removeStreamThread() and addStreamThread() calls to instances at random, and end up with one massive instance and one with no threads at all. Again, this probably merits some human intervention (or system redesign) That said, I don't think there's any real harm to just returning null in this case, but I hope that users would pay attention to this since it seems likely to indicate something has gone seriously wrong. I suppose Optional would be a reasonable compromise. As for the method name, what about activeStreamThreads() or liveStreamThreads() ? On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna wrote: > Hi John, > > I agree with you except for checking null. I would rather prefer to use > Optional as the return type to both methods. > > I changed the subject from [VOTE] to [DISCUSS] so that we can follow up > in the discussion thread. > > Best, > Bruno > > On 04.09.20 23:12, John Roesler wrote: > > Hi Sophie, > > > > Uh, oh, it's never a good sign when the discussion moves > > into the vote thread :) > > > > I agree with you, it seems like a good touch for > > removeStreamThread() to return the name of the thread that > > got removed, rather than a boolean flag. Maybe the return > > value would be `null` if there is no thread to remove. > > > > If we go that way, I'd suggest that addStreamThread() also > > return the name of the newly created thread, or null if no > > thread can be created right now. > > > > I'm not completely sure if I think that callers of this > > method would know exactly how many threads there are. Sure, > > if a human being is sitting there looking at the metrics or > > logs and decides to call the method, it would work out, but > > I'd expect this kind of method to find its way into > > automated tooling that reacts to things like current system > > load or resource saturation. Those kinds of toolchains often > > are part of a distributed system, and it's probably not that > > easy to guarantee that the thread count they observe is > > fully consistent with the number of threads that are > > actually running. Therefore, an in-situ `int > > numStreamThreads()` method might not be a bad idea. Then > > again, it seems sort of optional. A caller can catch an > > exception or react to a `null` return value just the same > > either way. Having both add/remove methods behave similarly > > is probably more valuable. > > > > Thanks, > > -John > > > > > > On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman > > wrote: > >> Hey, sorry for the late reply, I just have one minor suggestion. Since > we > >> don't > >> make any guarantees about which thread gets removed or allow the user to > >> specify, I think we should return either the index or full name of the > >> thread > >> that does get removed by removeThread(). > >> > >> I know you just updated the KIP to return true/false if there > are/aren't any > >> threads to be removed, but I think this would be more appropriate as an > >> exception than as a return type. I think it's reasonable to expect > users to > >> have some sense to how many threads are remaining, and not try to remove > >> a thread when there is none left. To me, that indicates something wrong > >> with the user application code and should be treated as an exceptional > case. > >> I don't think the same code clarify argument applies here as to the > >> addStreamThread() case, as there's no reason for an application to be > >> looping and retrying removeStreamThread() since if that fails, it's > because > >> there are no threads left and thus it will continue to always fail. And > if > >> the > >> user actually wants to shut down all threads, they should just close the > >> whole application rather than call removeStreamThread() in a loop. > >> > >> While I generally think it should be straightforward for users to track > how > >> many stream threads they have running, maybe it would be nice to add > >> a small utility method that does this for them. Something like > >> > >> // Returns the number of currently alive threads > >> boolean runningStreamThreads(); > >> > >> On Thu, Sep 3, 2020
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi Sophie, Thanks for your feedback! I replied inline. I changed the subject from [VOTE] to [DISCUSS] so that we can follow up in the discussion thread. Best, Bruno On 03.09.20 21:15, Sophie Blee-Goldman wrote: Hey, sorry for the late reply, I just have one minor suggestion. Since we don't make any guarantees about which thread gets removed or allow the user to specify, I think we should return either the index or full name of the thread that does get removed by removeThread(). I guess that could be good idea to allow users to log which stream thread was removed and why. Did you have another use case in mind? I know you just updated the KIP to return true/false if there are/aren't any threads to be removed, but I think this would be more appropriate as an exception than as a return type. I think it's reasonable to expect users to have some sense to how many threads are remaining, and not try to remove a thread when there is none left. To me, that indicates something wrong with the user application code and should be treated as an exceptional case. I don't think the same code clarify argument applies here as to the addStreamThread() case, as there's no reason for an application to be looping and retrying removeStreamThread() since if that fails, it's because there are no threads left and thus it will continue to always fail. And if the user actually wants to shut down all threads, they should just close the whole application rather than call removeStreamThread() in a loop. I do not agree that removing a stream thread from a client that does not have any stream threads is exceptional -- at least not for the Kafka Streams client. It may be for the caller, but then the caller should throw an exception. It is not true that removeStreamThread() will always fail once it starts to fail, because a stream thread could be added between two calls to removeStreamThread(). I can imagine that users might also want to keep around a running Kafka Streams client without running stream threads to be able to start new stream threads faster, especially when a global table is involved which would still be updated also without running stream threads. While I generally think it should be straightforward for users to track how many stream threads they have running, maybe it would be nice to add a small utility method that does this for them. Something like // Returns the number of currently alive threads boolean runningStreamThreads(); I am not completely against this. I would just not call it runningStreamThreads() because that could be misunderstood as returning handlers to stream threads in state running. On Thu, Sep 3, 2020 at 7:41 AM Matthias J. Sax wrote: +1 (binding) On 9/3/20 6:16 AM, Bruno Cadonna wrote: Hi, I would like to start the voting on KIP-663 that proposes to add methods to the Kafka Streams client to add and remove stream threads during execution. https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads Best, Bruno
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi John, I agree with you except for checking null. I would rather prefer to use Optional as the return type to both methods. I changed the subject from [VOTE] to [DISCUSS] so that we can follow up in the discussion thread. Best, Bruno On 04.09.20 23:12, John Roesler wrote: Hi Sophie, Uh, oh, it's never a good sign when the discussion moves into the vote thread :) I agree with you, it seems like a good touch for removeStreamThread() to return the name of the thread that got removed, rather than a boolean flag. Maybe the return value would be `null` if there is no thread to remove. If we go that way, I'd suggest that addStreamThread() also return the name of the newly created thread, or null if no thread can be created right now. I'm not completely sure if I think that callers of this method would know exactly how many threads there are. Sure, if a human being is sitting there looking at the metrics or logs and decides to call the method, it would work out, but I'd expect this kind of method to find its way into automated tooling that reacts to things like current system load or resource saturation. Those kinds of toolchains often are part of a distributed system, and it's probably not that easy to guarantee that the thread count they observe is fully consistent with the number of threads that are actually running. Therefore, an in-situ `int numStreamThreads()` method might not be a bad idea. Then again, it seems sort of optional. A caller can catch an exception or react to a `null` return value just the same either way. Having both add/remove methods behave similarly is probably more valuable. Thanks, -John On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman wrote: Hey, sorry for the late reply, I just have one minor suggestion. Since we don't make any guarantees about which thread gets removed or allow the user to specify, I think we should return either the index or full name of the thread that does get removed by removeThread(). I know you just updated the KIP to return true/false if there are/aren't any threads to be removed, but I think this would be more appropriate as an exception than as a return type. I think it's reasonable to expect users to have some sense to how many threads are remaining, and not try to remove a thread when there is none left. To me, that indicates something wrong with the user application code and should be treated as an exceptional case. I don't think the same code clarify argument applies here as to the addStreamThread() case, as there's no reason for an application to be looping and retrying removeStreamThread() since if that fails, it's because there are no threads left and thus it will continue to always fail. And if the user actually wants to shut down all threads, they should just close the whole application rather than call removeStreamThread() in a loop. While I generally think it should be straightforward for users to track how many stream threads they have running, maybe it would be nice to add a small utility method that does this for them. Something like // Returns the number of currently alive threads boolean runningStreamThreads(); On Thu, Sep 3, 2020 at 7:41 AM Matthias J. Sax wrote: +1 (binding) On 9/3/20 6:16 AM, Bruno Cadonna wrote: Hi, I would like to start the voting on KIP-663 that proposes to add methods to the Kafka Streams client to add and remove stream threads during execution. https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads Best, Bruno
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Thanks! SGTM. -Matthias On 9/3/20 3:17 AM, Bruno Cadonna wrote: > Hi Matthias, > > I replied inline. > > Best, > Bruno > > On 02.09.20 22:06, Matthias J. Sax wrote: >> Thanks for updating the KIP. >> >> Why do you propose to return `boolean` from addStreamThread() if the >> thread could not be started? As an alternative, we could also throw an >> exception if the client is not in state RUNNING? -- I guess both are >> valid options: just want to see what the pros/cons of each approach >> would be? >> > > I prefer to return a boolean because it is nothing exceptional if a > stream thread cannot be added due to an inappropriate state. State > changes are expected in Streams. Furthermore, users should not be forced > to control their program flow by catching exceptions. Let me give you > some examples for returning a boolean and throwing an exception: > > returning a boolean > > while (!kafkaStreams.addStreamThread() && > kafkaStreams.state() != State.NOT_RUNNING && > kafkaStreams.state() != State.ERROR) { > } > > > throwing an exception > > boolean added = false; > while (!added && > kafkaStreams.state() != State.NOT_RUNNING && > kafkaStreams.state() != State.ERROR) { > > try { > kafkaStreams.addStreamThread(); > added = true; > } catch (final Exception ex) { > // do nothing > } > } > > IMO the first example is more readable than the second. > > >> Btw: should we allow to add a new thread if the state is REBALANCING, >> too? I actually don't see a reason why we should not allow this? >> > > I guess you are right. I will update the KIP and include REBALANCING. > > >> For removeStreamThread(), might it be worth to actually guarantee that >> the thread with the largest index is stopped instead of leaving if >> unspecified? It does not seem to be a big burden on the implementation >> and given that we plan to reused indices of died threads, it might be >> nice to have a contract? Or would there be any negative impact if we >> guarantee it? >> > > I left unspecified which stream thread is removed since I could not find > any good reason for a guarantee. Also in your comment, I do not see what > advantage, we would have if we guaranteed that the stream thread with > the largest index is stopped. It would not guarantee that the next added > stream thread would get the largest index, because another stream thread > with a lower index could have failed in the meanwhile and now two > indices are up for grabs. > Leaving unspecified which stream thread is removed also gives us the > possibility to choose the stream thread to remove according to other > aspects like for example the one with the least local state. > > >> Another thought: should we add a parameter `numberOfThreads` to each >> method to allow users to start/stop multiple threads at once? >> > > I would keep it simple for now and add overloads if users request them. > > >> What happens if there is zero running threads and one calls >> removeStreamThread()? Should we also add a `boolean` flag and return >> `false` for this case (or throw an exception)? >> > > Yeah, I think this is a good idea for the programmatical removal of all > threads. However, I would not throw an exception for the reasons I > pointed out above. > > >> >> For the metric name, I would prefer "failed" over "crashed". Thoughts? >> > > I think I like "failed" more than "crashed" and it is also more > consistent with other parts of the code like the > ProductionExceptionHandlerResponse.FAIL. > > >> >> Side remark for the PR: can we make sure to update the description of >> `num.stream.threads` to explain that it's the _initial_ number of >> threads on startup? >> > > Good point! > >> >> -Matthias >> >> >> On 9/1/20 2:01 PM, Walker Carlson wrote: >>> Hi Bruno, >>> >>> I read through your updated KIP and it looks good to me. I agree with >>> adding the metric to keep track of crashed streams in replace of a >>> list of >>> dead streams. >>> >>> best, >>> Wlaker :) >>> >>> On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna wrote: >>> Hi John, your proposal makes sense! I will update the KIP. Best, Bruno On 01.09.20 17:31, John Roesler wrote: > Hello Bruno, > > Thanks for the update! The KIP looks good to me; I only have > a grammatical complaint about the proposed metric name. > > "Died" is a verb, the past tense of "to die", but in the > expression,"x stream threads", x should be an adjective. To > be fair, "died" is also the past participle of "to die", and > participles can usually be used as adjectives. Maybe it > sounds wrong to me because there's already a specifically > adjectival form: "dead". So "dead-stream-threads" seems more > natural. > > However, I'm not sure if that captures the specific meaning > you're shooting for, namely that the metric counts only the > threads that died exceptionally, vs.
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi Matthias, I replied inline. Best, Bruno On 02.09.20 22:06, Matthias J. Sax wrote: Thanks for updating the KIP. Why do you propose to return `boolean` from addStreamThread() if the thread could not be started? As an alternative, we could also throw an exception if the client is not in state RUNNING? -- I guess both are valid options: just want to see what the pros/cons of each approach would be? I prefer to return a boolean because it is nothing exceptional if a stream thread cannot be added due to an inappropriate state. State changes are expected in Streams. Furthermore, users should not be forced to control their program flow by catching exceptions. Let me give you some examples for returning a boolean and throwing an exception: returning a boolean while (!kafkaStreams.addStreamThread() && kafkaStreams.state() != State.NOT_RUNNING && kafkaStreams.state() != State.ERROR) { } throwing an exception boolean added = false; while (!added && kafkaStreams.state() != State.NOT_RUNNING && kafkaStreams.state() != State.ERROR) { try { kafkaStreams.addStreamThread(); added = true; } catch (final Exception ex) { // do nothing } } IMO the first example is more readable than the second. Btw: should we allow to add a new thread if the state is REBALANCING, too? I actually don't see a reason why we should not allow this? I guess you are right. I will update the KIP and include REBALANCING. For removeStreamThread(), might it be worth to actually guarantee that the thread with the largest index is stopped instead of leaving if unspecified? It does not seem to be a big burden on the implementation and given that we plan to reused indices of died threads, it might be nice to have a contract? Or would there be any negative impact if we guarantee it? I left unspecified which stream thread is removed since I could not find any good reason for a guarantee. Also in your comment, I do not see what advantage, we would have if we guaranteed that the stream thread with the largest index is stopped. It would not guarantee that the next added stream thread would get the largest index, because another stream thread with a lower index could have failed in the meanwhile and now two indices are up for grabs. Leaving unspecified which stream thread is removed also gives us the possibility to choose the stream thread to remove according to other aspects like for example the one with the least local state. Another thought: should we add a parameter `numberOfThreads` to each method to allow users to start/stop multiple threads at once? I would keep it simple for now and add overloads if users request them. What happens if there is zero running threads and one calls removeStreamThread()? Should we also add a `boolean` flag and return `false` for this case (or throw an exception)? Yeah, I think this is a good idea for the programmatical removal of all threads. However, I would not throw an exception for the reasons I pointed out above. For the metric name, I would prefer "failed" over "crashed". Thoughts? I think I like "failed" more than "crashed" and it is also more consistent with other parts of the code like the ProductionExceptionHandlerResponse.FAIL. Side remark for the PR: can we make sure to update the description of `num.stream.threads` to explain that it's the _initial_ number of threads on startup? Good point! -Matthias On 9/1/20 2:01 PM, Walker Carlson wrote: Hi Bruno, I read through your updated KIP and it looks good to me. I agree with adding the metric to keep track of crashed streams in replace of a list of dead streams. best, Wlaker :) On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna wrote: Hi John, your proposal makes sense! I will update the KIP. Best, Bruno On 01.09.20 17:31, John Roesler wrote: Hello Bruno, Thanks for the update! The KIP looks good to me; I only have a grammatical complaint about the proposed metric name. "Died" is a verb, the past tense of "to die", but in the expression,"x stream threads", x should be an adjective. To be fair, "died" is also the past participle of "to die", and participles can usually be used as adjectives. Maybe it sounds wrong to me because there's already a specifically adjectival form: "dead". So "dead-stream-threads" seems more natural. However, I'm not sure if that captures the specific meaning you're shooting for, namely that the metric counts only the threads that died exceptionally, vs. from calling "removeStreamThread()". What do you think of "crashed- stream-threads" instead? Thanks, -John On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote: Hi, I updated the KIP with the feedback so far. I removed the API to close the Kafka Streams client asynchronously, since it should be possible to avoid the deadlock with the existing method and without a KIP. Please have a look at the updated KIP and let me know what you think.
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Thanks for updating the KIP. Why do you propose to return `boolean` from addStreamThread() if the thread could not be started? As an alternative, we could also throw an exception if the client is not in state RUNNING? -- I guess both are valid options: just want to see what the pros/cons of each approach would be? Btw: should we allow to add a new thread if the state is REBALANCING, too? I actually don't see a reason why we should not allow this? For removeStreamThread(), might it be worth to actually guarantee that the thread with the largest index is stopped instead of leaving if unspecified? It does not seem to be a big burden on the implementation and given that we plan to reused indices of died threads, it might be nice to have a contract? Or would there be any negative impact if we guarantee it? Another thought: should we add a parameter `numberOfThreads` to each method to allow users to start/stop multiple threads at once? What happens if there is zero running threads and one calls removeStreamThread()? Should we also add a `boolean` flag and return `false` for this case (or throw an exception)? For the metric name, I would prefer "failed" over "crashed". Thoughts? Side remark for the PR: can we make sure to update the description of `num.stream.threads` to explain that it's the _initial_ number of threads on startup? -Matthias On 9/1/20 2:01 PM, Walker Carlson wrote: > Hi Bruno, > > I read through your updated KIP and it looks good to me. I agree with > adding the metric to keep track of crashed streams in replace of a list of > dead streams. > > best, > Wlaker :) > > On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna wrote: > >> Hi John, >> >> your proposal makes sense! I will update the KIP. >> >> Best, >> Bruno >> >> On 01.09.20 17:31, John Roesler wrote: >>> Hello Bruno, >>> >>> Thanks for the update! The KIP looks good to me; I only have >>> a grammatical complaint about the proposed metric name. >>> >>> "Died" is a verb, the past tense of "to die", but in the >>> expression,"x stream threads", x should be an adjective. To >>> be fair, "died" is also the past participle of "to die", and >>> participles can usually be used as adjectives. Maybe it >>> sounds wrong to me because there's already a specifically >>> adjectival form: "dead". So "dead-stream-threads" seems more >>> natural. >>> >>> However, I'm not sure if that captures the specific meaning >>> you're shooting for, namely that the metric counts only the >>> threads that died exceptionally, vs. from calling >>> "removeStreamThread()". What do you think of "crashed- >>> stream-threads" instead? >>> >>> Thanks, >>> -John >>> >>> On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote: Hi, I updated the KIP with the feedback so far. I removed the API to close the Kafka Streams client asynchronously, since it should be possible to avoid the deadlock with the existing method and without a KIP. Please have a look at the updated KIP and let me know what you think. >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads Best, Bruno On 26.08.20 16:31, Bruno Cadonna wrote: > Hi, > > I would like to propose the following KIP to start and shut down stream > threads during execution as well as to shut down asynchronously a Kafka > Streams client from an uncaught exception handler. > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients > > > Best, > Bruno >>> >> > signature.asc Description: OpenPGP digital signature
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi Bruno, I read through your updated KIP and it looks good to me. I agree with adding the metric to keep track of crashed streams in replace of a list of dead streams. best, Wlaker :) On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna wrote: > Hi John, > > your proposal makes sense! I will update the KIP. > > Best, > Bruno > > On 01.09.20 17:31, John Roesler wrote: > > Hello Bruno, > > > > Thanks for the update! The KIP looks good to me; I only have > > a grammatical complaint about the proposed metric name. > > > > "Died" is a verb, the past tense of "to die", but in the > > expression,"x stream threads", x should be an adjective. To > > be fair, "died" is also the past participle of "to die", and > > participles can usually be used as adjectives. Maybe it > > sounds wrong to me because there's already a specifically > > adjectival form: "dead". So "dead-stream-threads" seems more > > natural. > > > > However, I'm not sure if that captures the specific meaning > > you're shooting for, namely that the metric counts only the > > threads that died exceptionally, vs. from calling > > "removeStreamThread()". What do you think of "crashed- > > stream-threads" instead? > > > > Thanks, > > -John > > > > On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote: > >> Hi, > >> > >> I updated the KIP with the feedback so far. I removed the API to close > >> the Kafka Streams client asynchronously, since it should be possible to > >> avoid the deadlock with the existing method and without a KIP. > >> > >> Please have a look at the updated KIP and let me know what you think. > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads > >> > >> Best, > >> Bruno > >> > >> On 26.08.20 16:31, Bruno Cadonna wrote: > >>> Hi, > >>> > >>> I would like to propose the following KIP to start and shut down stream > >>> threads during execution as well as to shut down asynchronously a Kafka > >>> Streams client from an uncaught exception handler. > >>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients > >>> > >>> > >>> Best, > >>> Bruno > > >
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi John, your proposal makes sense! I will update the KIP. Best, Bruno On 01.09.20 17:31, John Roesler wrote: Hello Bruno, Thanks for the update! The KIP looks good to me; I only have a grammatical complaint about the proposed metric name. "Died" is a verb, the past tense of "to die", but in the expression,"x stream threads", x should be an adjective. To be fair, "died" is also the past participle of "to die", and participles can usually be used as adjectives. Maybe it sounds wrong to me because there's already a specifically adjectival form: "dead". So "dead-stream-threads" seems more natural. However, I'm not sure if that captures the specific meaning you're shooting for, namely that the metric counts only the threads that died exceptionally, vs. from calling "removeStreamThread()". What do you think of "crashed- stream-threads" instead? Thanks, -John On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote: Hi, I updated the KIP with the feedback so far. I removed the API to close the Kafka Streams client asynchronously, since it should be possible to avoid the deadlock with the existing method and without a KIP. Please have a look at the updated KIP and let me know what you think. https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads Best, Bruno On 26.08.20 16:31, Bruno Cadonna wrote: Hi, I would like to propose the following KIP to start and shut down stream threads during execution as well as to shut down asynchronously a Kafka Streams client from an uncaught exception handler. https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients Best, Bruno
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hello Bruno, Thanks for the update! The KIP looks good to me; I only have a grammatical complaint about the proposed metric name. "Died" is a verb, the past tense of "to die", but in the expression,"x stream threads", x should be an adjective. To be fair, "died" is also the past participle of "to die", and participles can usually be used as adjectives. Maybe it sounds wrong to me because there's already a specifically adjectival form: "dead". So "dead-stream-threads" seems more natural. However, I'm not sure if that captures the specific meaning you're shooting for, namely that the metric counts only the threads that died exceptionally, vs. from calling "removeStreamThread()". What do you think of "crashed- stream-threads" instead? Thanks, -John On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote: > Hi, > > I updated the KIP with the feedback so far. I removed the API to close > the Kafka Streams client asynchronously, since it should be possible to > avoid the deadlock with the existing method and without a KIP. > > Please have a look at the updated KIP and let me know what you think. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads > > Best, > Bruno > > On 26.08.20 16:31, Bruno Cadonna wrote: > > Hi, > > > > I would like to propose the following KIP to start and shut down stream > > threads during execution as well as to shut down asynchronously a Kafka > > Streams client from an uncaught exception handler. > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients > > > > > > > > Best, > > Bruno
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi, I updated the KIP with the feedback so far. I removed the API to close the Kafka Streams client asynchronously, since it should be possible to avoid the deadlock with the existing method and without a KIP. Please have a look at the updated KIP and let me know what you think. https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads Best, Bruno On 26.08.20 16:31, Bruno Cadonna wrote: Hi, I would like to propose the following KIP to start and shut down stream threads during execution as well as to shut down asynchronously a Kafka Streams client from an uncaught exception handler. https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients Best, Bruno
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Thank you Matthias for the feedback. Please find my answers inline. Best, Bruno On 26.08.20 19:54, Matthias J. Sax wrote: Thanks for the KIP Bruno. While reading it, I had the same questions as raised by John and Walker (so I won't repeat them). In addition, I think that adding/removing threads should only be allowed if the client state is RUNNING (but not in any other state, maybe except ERROR). Furthermore, it seem that the methods must be `synchronized` similar to `start()` and `close()`. Good point about only adding stream threads in client state RUNNING. I will add that to the KIP. IMO `synchronized` is a implementation detail that we should discuss on the PR. While I understand that current `close(Duration.ZERO)` is not the same as `requestClose()`, I am wondering if we should change the semantics of `close()` instead of adding a new method though? As I wrote in my previous e-mails, I will rethink the addition of method requestClose(). Btw: for thread naming, I personally think that just using a counter (as we do right now) might be ok. If this becomes an issue, we could improve it later. -Matthias On 8/26/20 9:46 AM, Walker Carlson wrote: Hello Burno, Thanks for the KIP! Not to pile on, but I also had a couple additional questions. I am not super familiar with the StreamThread internals so please forgive any misconceptions if these are not relevant questions. 1. In requestClose if a thread does not close properly and deadlocks for some reason how will we avoid the client waiting on the thread to close? like when you try to close the Kafka Streams client from a thread UncaughtExcetiopnHandler now. The kip said it would improve the handling of these conditions, However I did not find it clear what strategy this improvement would use. Maybe handling broken threads is out of the scope of this KIP or am I missing something? 2a. Will the removal of Stream Threads in state DEAD be automatic? And will it be for all in that state or just for those closed with shutDownStreamThread? 2b. From the wording it seems that removing DEAD threads form the Kafka Streams client will be a new feature of this kip. If that is the case is the reasonable possibility that keeping the dead threads in metadata might be useful? For example if a thread is continually erroring and restarting a replacement 3. Maybe instead of addThread() we could use startNewThread()? I agree with John that startStreamThread could easily be misinterpreted i.e. as startStreamThreads. Thanks, Walker On Wed, Aug 26, 2020 at 8:48 AM John Roesler wrote: Hi Bruno, Thanks for the well motivated and throrough KIP! It's a good point that the record cache should be re- distributed over the threads. Reading your KIP leads me to a few questions: 1. Start vs. Add Maybe this is paranoid, but I'm mildly concerned that users who don't read the docs too carefully might think they need to call "start thread" to start their application's threads after calling `KafkaStreams.start` to start the app. Definitely not sure about this, but I'm wondering if it would be more clear to say `addThread` and `remove/dropThread` to make it clear that we are adding to or subtracting from the total number of threads, not just starting and stopping threads that are already in the pool. 2. requestClose() vs. close(Duration.ZERO) It's a very good point about deadlocks. Can you explain why we need a new method, though? The specified behavior seems the same as `close(Duration.ZERO)`. 3. Thread Naming Maybe this point is silly, but how will newly added threads be numbered? Will dead and hence removed threads' names be reused? Or will there be a monotonic counter for the lifetime of the instance? It seems relevant to mention this because it will affect metrics and logs. I guess _maybe_ it would be nice for the thread that replaces a crashed thread to take over its name, but since the crashed thread still exists while the UncaughtExceptionHandler is executing, its name wouldn't be up for grabs in any case yet. On the other hand, it might be nicer for operators to be able to distinguish the logs/metrics of the replacement thread from the dead one, so not reusing thread names might be better. On the other hand, not reusing thread names in a "replacement" exception handler means that a crashy application would report an unbounded number of thread ids over its lifespan. This might be an issue for people using popular metrics aggregation services that charge per unique combination of metrics tags. Then again, maybe this is a pathological case not worth considering. And yes, I realized I just implied that I have three hands. 4. ERROR State Can you elaborate why users explicitly stopping all threads should put the application into ERROR state? It does seem like it's not exactly "running" at that point, but it also doesn't seem like an error. Right now, ERROR is a terminal state that indicates users must discard the application instance
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi Wlaker (pun intended), Thank you for your feedback! Please find my answers inline. Best, Bruno On 26.08.20 18:46, Walker Carlson wrote: Hello Burno, Thanks for the KIP! Not to pile on, but I also had a couple additional questions. I am not super familiar with the StreamThread internals so please forgive any misconceptions if these are not relevant questions. 1. In requestClose if a thread does not close properly and deadlocks for some reason how will we avoid the client waiting on the thread to close? like when you try to close the Kafka Streams client from a thread UncaughtExcetiopnHandler now. The kip said it would improve the handling of these conditions, However I did not find it clear what strategy this improvement would use. Maybe handling broken threads is out of the scope of this KIP or am I missing something? The main goal of requestClose() is to avoid the deadlock when closing the Kafka Streams client from the uncaught exception handler of a stream thread. The case in which the close may encounter broken stream threads is orthogonal to this KIP, IMO. But as stated in my previous e-mail, I will rethink the necessity of a method requestClose(). 2a. Will the removal of Stream Threads in state DEAD be automatic? And will it be for all in that state or just for those closed with shutDownStreamThread? Yes, it would be automatic and it would be for all. I will some words about that in the KIP. 2b. From the wording it seems that removing DEAD threads form the Kafka Streams client will be a new feature of this kip. If that is the case is the reasonable possibility that keeping the dead threads in metadata might be useful? For example if a thread is continually erroring and restarting a replacement Yes, that is a good question. I do not know how reasonable it is to keep DEAD stream threads around. However, what we could do is introducing a metric in the KIP to keep track of the stream threads that have died. IMO, such a metric is better than keeping around DEAD stream thread. I will add the metric to the KIP. 3. Maybe instead of addThread() we could use startNewThread()? I agree with John that startStreamThread could easily be misinterpreted i.e. as startStreamThreads. I will change the names in the KIP. Thanks, Walker On Wed, Aug 26, 2020 at 8:48 AM John Roesler wrote: Hi Bruno, Thanks for the well motivated and throrough KIP! It's a good point that the record cache should be re- distributed over the threads. Reading your KIP leads me to a few questions: 1. Start vs. Add Maybe this is paranoid, but I'm mildly concerned that users who don't read the docs too carefully might think they need to call "start thread" to start their application's threads after calling `KafkaStreams.start` to start the app. Definitely not sure about this, but I'm wondering if it would be more clear to say `addThread` and `remove/dropThread` to make it clear that we are adding to or subtracting from the total number of threads, not just starting and stopping threads that are already in the pool. 2. requestClose() vs. close(Duration.ZERO) It's a very good point about deadlocks. Can you explain why we need a new method, though? The specified behavior seems the same as `close(Duration.ZERO)`. 3. Thread Naming Maybe this point is silly, but how will newly added threads be numbered? Will dead and hence removed threads' names be reused? Or will there be a monotonic counter for the lifetime of the instance? It seems relevant to mention this because it will affect metrics and logs. I guess _maybe_ it would be nice for the thread that replaces a crashed thread to take over its name, but since the crashed thread still exists while the UncaughtExceptionHandler is executing, its name wouldn't be up for grabs in any case yet. On the other hand, it might be nicer for operators to be able to distinguish the logs/metrics of the replacement thread from the dead one, so not reusing thread names might be better. On the other hand, not reusing thread names in a "replacement" exception handler means that a crashy application would report an unbounded number of thread ids over its lifespan. This might be an issue for people using popular metrics aggregation services that charge per unique combination of metrics tags. Then again, maybe this is a pathological case not worth considering. And yes, I realized I just implied that I have three hands. 4. ERROR State Can you elaborate why users explicitly stopping all threads should put the application into ERROR state? It does seem like it's not exactly "running" at that point, but it also doesn't seem like an error. Right now, ERROR is a terminal state that indicates users must discard the application instance and create a new one. If there is still a possiblity that we'd need a terminally corrupted state, it would probably be a mistake to add an out-transition from it. The documentation on that state says that it happens when all
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi John, Thank you for your feedback! Please find my answers inline. Best, Bruno On 26.08.20 17:49, John Roesler wrote: Hi Bruno, Thanks for the well motivated and throrough KIP! It's a good point that the record cache should be re- distributed over the threads. Reading your KIP leads me to a few questions: 1. Start vs. Add Maybe this is paranoid, but I'm mildly concerned that users who don't read the docs too carefully might think they need to call "start thread" to start their application's threads after calling `KafkaStreams.start` to start the app. Definitely not sure about this, but I'm wondering if it would be more clear to say `addThread` and `remove/dropThread` to make it clear that we are adding to or subtracting from the total number of threads, not just starting and stopping threads that are already in the pool. This makes sense to me. I will rename the methods. 2. requestClose() vs. close(Duration.ZERO) It's a very good point about deadlocks. Can you explain why we need a new method, though? The specified behavior seems the same as `close(Duration.ZERO)`. You are right, `close(Duration.ZERO)` would avoid the deadlock. However, `close(Duration.ZERO)` does not guarantee you that all resources get closed. Furthermore, it is error-prone to rely on users to use the correct overload. One possibility to avoid that users use the wrong overload is to check in `close()` if the calling thread is a stream thread and in that case to call `close(0)` instead of `close(Long.MAX_VALUE)`. Maybe, I could also find a solution for the resource issue. Let me think about it. In conclusion, I agree that a new method may not be needed. 3. Thread Naming Maybe this point is silly, but how will newly added threads be numbered? Will dead and hence removed threads' names be reused? Or will there be a monotonic counter for the lifetime of the instance? It seems relevant to mention this because it will affect metrics and logs. I guess _maybe_ it would be nice for the thread that replaces a crashed thread to take over its name, but since the crashed thread still exists while the UncaughtExceptionHandler is executing, its name wouldn't be up for grabs in any case yet. On the other hand, it might be nicer for operators to be able to distinguish the logs/metrics of the replacement thread from the dead one, so not reusing thread names might be better. On the other hand, not reusing thread names in a "replacement" exception handler means that a crashy application would report an unbounded number of thread ids over its lifespan. This might be an issue for people using popular metrics aggregation services that charge per unique combination of metrics tags. Then again, maybe this is a pathological case not worth considering. And yes, I realized I just implied that I have three hands. We definitely cannot reuse the name of a crashed stream thread, immediately. What we could do is keep a list of previously used names (or secondhand names to take your hand metaphor up) that are free now and reuse them for new stream threads. If there are no second-hand names a counter is increased and a new name is created. The list of secondhand names would be bounded by the maximum number of stream threads that were running contemporaneously. I guess that would not be too complex to implement and would avoid the pathological case. IMO, it would be less annoying have a new stream thread's metrics monitored in a graph of a stream thread that previously crashed than to run into the pathological case that costs money. 4. ERROR State Can you elaborate why users explicitly stopping all threads should put the application into ERROR state? It does seem like it's not exactly "running" at that point, but it also doesn't seem like an error. Right now, ERROR is a terminal state that indicates users must discard the application instance and create a new one. If there is still a possiblity that we'd need a terminally corrupted state, it would probably be a mistake to add an out-transition from it. The documentation on that state says that it happens when all StreamThreads die or when the Global thread dies. We have a proposal from Navinder (KIP-406) to allow the Global thread to automatically come back to life after some errors, but presumably others would still be fatal. I guess your reasoning is that if the cause of the ERROR state happens to be just from all the StreamThreads dying, and if there's a way to replace them, then it should be possible to recover from this ERROR state. Maybe that makes sense, and we should just note that if the Global thread is dead, it won't be possible to transition out of ERROR state. Here, I was torn between ERROR and RUNNING. I guess the key question here is: What is the meaning of ERROR? Is it a terminal state that signalizes a fatal error that cannot be recovered without a restart of the client? If yes, there should not be any transition from ERROR to any state. I
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Thanks for the KIP Bruno. While reading it, I had the same questions as raised by John and Walker (so I won't repeat them). In addition, I think that adding/removing threads should only be allowed if the client state is RUNNING (but not in any other state, maybe except ERROR). Furthermore, it seem that the methods must be `synchronized` similar to `start()` and `close()`. While I understand that current `close(Duration.ZERO)` is not the same as `requestClose()`, I am wondering if we should change the semantics of `close()` instead of adding a new method though? Btw: for thread naming, I personally think that just using a counter (as we do right now) might be ok. If this becomes an issue, we could improve it later. -Matthias On 8/26/20 9:46 AM, Walker Carlson wrote: > Hello Burno, > > Thanks for the KIP! > > Not to pile on, but I also had a couple additional questions. I am not > super familiar with the StreamThread internals so please forgive any > misconceptions if these are not relevant questions. > > 1. In requestClose if a thread does not close properly and deadlocks for > some reason how will we avoid the client waiting on the thread to close? > like when you try to close the Kafka Streams client from a thread > UncaughtExcetiopnHandler now. > > The kip said it would improve the handling of these conditions, However I > did not find it clear what strategy this improvement would use. Maybe > handling broken threads is out of the scope of this KIP or am I missing > something? > > 2a. Will the removal of Stream Threads in state DEAD be automatic? And will > it be for all in that state or just for those closed with > shutDownStreamThread? > > 2b. From the wording it seems that removing DEAD threads form the Kafka > Streams client will be a new feature of this kip. If that is the case is > the reasonable possibility that keeping the dead threads in metadata might > be useful? For example if a thread is continually erroring and restarting a > replacement > > 3. Maybe instead of addThread() we could use startNewThread()? > I agree with John that startStreamThread could easily be misinterpreted > i.e. as startStreamThreads. > > Thanks, > Walker > > On Wed, Aug 26, 2020 at 8:48 AM John Roesler wrote: > >> Hi Bruno, >> >> Thanks for the well motivated and throrough KIP! >> >> It's a good point that the record cache should be re- >> distributed over the threads. >> >> Reading your KIP leads me to a few questions: >> >> 1. Start vs. Add >> >> Maybe this is paranoid, but I'm mildly concerned that users >> who don't read the docs too carefully might think they need >> to call "start thread" to start their application's threads >> after calling `KafkaStreams.start` to start the app. >> Definitely not sure about this, but I'm wondering if it >> would be more clear to say `addThread` and >> `remove/dropThread` to make it clear that we are adding to >> or subtracting from the total number of threads, not just >> starting and stopping threads that are already in the pool. >> >> 2. requestClose() vs. close(Duration.ZERO) >> >> It's a very good point about deadlocks. Can you explain why >> we need a new method, though? The specified behavior seems >> the same as `close(Duration.ZERO)`. >> >> 3. Thread Naming >> >> Maybe this point is silly, but how will newly added threads >> be numbered? Will dead and hence removed threads' names be >> reused? Or will there be a monotonic counter for the >> lifetime of the instance? >> >> It seems relevant to mention this because it will affect >> metrics and logs. I guess _maybe_ it would be nice for the >> thread that replaces a crashed thread to take over its name, >> but since the crashed thread still exists while the >> UncaughtExceptionHandler is executing, its name wouldn't be >> up for grabs in any case yet. >> >> On the other hand, it might be nicer for operators to be >> able to distinguish the logs/metrics of the replacement >> thread from the dead one, so not reusing thread names might >> be better. >> >> On the other hand, not reusing thread names in a >> "replacement" exception handler means that a crashy >> application would report an unbounded number of thread ids >> over its lifespan. This might be an issue for people using >> popular metrics aggregation services that charge per unique >> combination of metrics tags. Then again, maybe this is a >> pathological case not worth considering. >> >> And yes, I realized I just implied that I have three hands. >> >> 4. ERROR State >> >> Can you elaborate why users explicitly stopping all threads >> should put the application into ERROR state? It does seem >> like it's not exactly "running" at that point, but it also >> doesn't seem like an error. >> >> Right now, ERROR is a terminal state that indicates users >> must discard the application instance and create a new one. >> If there is still a possiblity that we'd need a terminally >> corrupted state, it would probably be a mistake to add an >>
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hello Burno, Thanks for the KIP! Not to pile on, but I also had a couple additional questions. I am not super familiar with the StreamThread internals so please forgive any misconceptions if these are not relevant questions. 1. In requestClose if a thread does not close properly and deadlocks for some reason how will we avoid the client waiting on the thread to close? like when you try to close the Kafka Streams client from a thread UncaughtExcetiopnHandler now. The kip said it would improve the handling of these conditions, However I did not find it clear what strategy this improvement would use. Maybe handling broken threads is out of the scope of this KIP or am I missing something? 2a. Will the removal of Stream Threads in state DEAD be automatic? And will it be for all in that state or just for those closed with shutDownStreamThread? 2b. From the wording it seems that removing DEAD threads form the Kafka Streams client will be a new feature of this kip. If that is the case is the reasonable possibility that keeping the dead threads in metadata might be useful? For example if a thread is continually erroring and restarting a replacement 3. Maybe instead of addThread() we could use startNewThread()? I agree with John that startStreamThread could easily be misinterpreted i.e. as startStreamThreads. Thanks, Walker On Wed, Aug 26, 2020 at 8:48 AM John Roesler wrote: > Hi Bruno, > > Thanks for the well motivated and throrough KIP! > > It's a good point that the record cache should be re- > distributed over the threads. > > Reading your KIP leads me to a few questions: > > 1. Start vs. Add > > Maybe this is paranoid, but I'm mildly concerned that users > who don't read the docs too carefully might think they need > to call "start thread" to start their application's threads > after calling `KafkaStreams.start` to start the app. > Definitely not sure about this, but I'm wondering if it > would be more clear to say `addThread` and > `remove/dropThread` to make it clear that we are adding to > or subtracting from the total number of threads, not just > starting and stopping threads that are already in the pool. > > 2. requestClose() vs. close(Duration.ZERO) > > It's a very good point about deadlocks. Can you explain why > we need a new method, though? The specified behavior seems > the same as `close(Duration.ZERO)`. > > 3. Thread Naming > > Maybe this point is silly, but how will newly added threads > be numbered? Will dead and hence removed threads' names be > reused? Or will there be a monotonic counter for the > lifetime of the instance? > > It seems relevant to mention this because it will affect > metrics and logs. I guess _maybe_ it would be nice for the > thread that replaces a crashed thread to take over its name, > but since the crashed thread still exists while the > UncaughtExceptionHandler is executing, its name wouldn't be > up for grabs in any case yet. > > On the other hand, it might be nicer for operators to be > able to distinguish the logs/metrics of the replacement > thread from the dead one, so not reusing thread names might > be better. > > On the other hand, not reusing thread names in a > "replacement" exception handler means that a crashy > application would report an unbounded number of thread ids > over its lifespan. This might be an issue for people using > popular metrics aggregation services that charge per unique > combination of metrics tags. Then again, maybe this is a > pathological case not worth considering. > > And yes, I realized I just implied that I have three hands. > > 4. ERROR State > > Can you elaborate why users explicitly stopping all threads > should put the application into ERROR state? It does seem > like it's not exactly "running" at that point, but it also > doesn't seem like an error. > > Right now, ERROR is a terminal state that indicates users > must discard the application instance and create a new one. > If there is still a possiblity that we'd need a terminally > corrupted state, it would probably be a mistake to add an > out-transition from it. > > The documentation on that state says that it happens when > all StreamThreads die or when the Global thread dies. We > have a proposal from Navinder (KIP-406) to allow the Global > thread to automatically come back to life after some errors, > but presumably others would still be fatal. > > I guess your reasoning is that if the cause of the ERROR > state happens to be just from all the StreamThreads dying, > and if there's a way to replace them, then it should be > possible to recover from this ERROR state. > > Maybe that makes sense, and we should just note that if the > Global thread is dead, it won't be possible to transition > out of ERROR state. > > 5. Global thread > > Should it be in the scope of this KIP to consider replacing > the GlobalStreamThread? > > 6. Restarting > > It seems like, if I configure Streams to have eg. 1 thread > and then add more threads over its lifetime (maybe up to
Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi Bruno, Thanks for the well motivated and throrough KIP! It's a good point that the record cache should be re- distributed over the threads. Reading your KIP leads me to a few questions: 1. Start vs. Add Maybe this is paranoid, but I'm mildly concerned that users who don't read the docs too carefully might think they need to call "start thread" to start their application's threads after calling `KafkaStreams.start` to start the app. Definitely not sure about this, but I'm wondering if it would be more clear to say `addThread` and `remove/dropThread` to make it clear that we are adding to or subtracting from the total number of threads, not just starting and stopping threads that are already in the pool. 2. requestClose() vs. close(Duration.ZERO) It's a very good point about deadlocks. Can you explain why we need a new method, though? The specified behavior seems the same as `close(Duration.ZERO)`. 3. Thread Naming Maybe this point is silly, but how will newly added threads be numbered? Will dead and hence removed threads' names be reused? Or will there be a monotonic counter for the lifetime of the instance? It seems relevant to mention this because it will affect metrics and logs. I guess _maybe_ it would be nice for the thread that replaces a crashed thread to take over its name, but since the crashed thread still exists while the UncaughtExceptionHandler is executing, its name wouldn't be up for grabs in any case yet. On the other hand, it might be nicer for operators to be able to distinguish the logs/metrics of the replacement thread from the dead one, so not reusing thread names might be better. On the other hand, not reusing thread names in a "replacement" exception handler means that a crashy application would report an unbounded number of thread ids over its lifespan. This might be an issue for people using popular metrics aggregation services that charge per unique combination of metrics tags. Then again, maybe this is a pathological case not worth considering. And yes, I realized I just implied that I have three hands. 4. ERROR State Can you elaborate why users explicitly stopping all threads should put the application into ERROR state? It does seem like it's not exactly "running" at that point, but it also doesn't seem like an error. Right now, ERROR is a terminal state that indicates users must discard the application instance and create a new one. If there is still a possiblity that we'd need a terminally corrupted state, it would probably be a mistake to add an out-transition from it. The documentation on that state says that it happens when all StreamThreads die or when the Global thread dies. We have a proposal from Navinder (KIP-406) to allow the Global thread to automatically come back to life after some errors, but presumably others would still be fatal. I guess your reasoning is that if the cause of the ERROR state happens to be just from all the StreamThreads dying, and if there's a way to replace them, then it should be possible to recover from this ERROR state. Maybe that makes sense, and we should just note that if the Global thread is dead, it won't be possible to transition out of ERROR state. 5. Global thread Should it be in the scope of this KIP to consider replacing the GlobalStreamThread? 6. Restarting It seems like, if I configure Streams to have eg. 1 thread and then add more threads over its lifetime (maybe up to 8), then I might be dismayed after a restart of the app to see it went back to 1. I guess it could just be my job as a user to update the config to match when I add or remove threads. Maybe that's the best place to land right now. It still might be a good point to mention in the KIP (and the docs). Thanks again! -John On Wed, 2020-08-26 at 16:31 +0200, Bruno Cadonna wrote: > Hi, > > I would like to propose the following KIP to start and shut down stream > threads during execution as well as to shut down asynchronously a Kafka > Streams client from an uncaught exception handler. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients > > > > Best, > Bruno
[DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients
Hi, I would like to propose the following KIP to start and shut down stream threads during execution as well as to shut down asynchronously a Kafka Streams client from an uncaught exception handler. https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients Best, Bruno