Thanks Bruno, your replies make sense to me. As for localThreadsMetadata() itself, I'd like to clarify if it would return any still-bookkept threads, or would it specifically filter out those DEAD threads even if they are not yet removed.
Otherwise, the KIP LGTM. Guozhang On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna <br...@confluent.io> wrote: > Hi Guozhang, > > Thank you for your feedback. I replied inline. > > Best, > Bruno > > On 09.09.20 23:43, Guozhang Wang wrote: > > Hello Bruno, > > > > Finally got some time to review your KIP and the discussion thread now.. > a > > few comments below: > > > > 1) I'm with Matthias about the newly added numberOfAliveStreamThreads > v.s. > > existing localThreadsMetadata: to me it seems we can always achieve the > > first based on the second. It seems not worthy to provide some "syntax > > sugar" to the API but just let users do the filtering themselves. > > I am not married to that method. I removed it. > > > Furthermore, I'm wondering what's the rationale behind removing the DEAD > > threads from localThreadsMetadata()? Personally I feel returning all > > threads, including those who are ever closed, either due to exception or > > due to removeStreamThread, would be beneficial for debugging purposes, as > > long as within the lifetime of an instance we expect the amount of such > > dead threads will not increase linearly --- and if we agree with that, > > maybe we can rename "removeStreamThread" to sth. like > > "terminateStreamThread" indicating it is only terminated but not removed > > --- and of course if users do not want to see those DEAD threads they can > > always filter them out. I'm just proposing that we should still leave the > > door open for those who want to check those ever terminated threads. > > > > I actually think the number of dead stream threads might increase > linearly. Assume users have a systematic error that continuously kills a > stream thread and they blindly start a new stream thread in the uncaught > exception handler. This scenario might be a mistake but if the > systematic error does not occur at a high rate, it could also be a > strategy to keep the application running during the investigation of the > systematic error. > > IMO, removing dead stream threads makes Kafka Streams more robust > because it prevent a possibly unbounded increase of memory usage. If > users want to debug the dead stream threads they can monitor the number > of dead threads with the metric proposed in the KIP and they could > additionally log the metadata of the dying stream thread in the uncaught > exception handler. I do not think that there is need to keep dead stream > threads around. > > > 2) I think it would help to write down some example user code in > exception > > handler e.g. to illustrate how this would be implemented -- e.g. we know > > that practically the handler need to maintain a "this" reference of the > > instance anyways in order to shutdown the whole instance or, > add/terminate > > threads dynamically, but I want to see if we have listed all possible > call > > paths like: a) a thread's handler logic to terminate another thread, b) a > > thread handler to add new threads, etc are all appropriately supported > > without deadlocks. > > > > I added an example for an uncaught exception handler that adds a stream > thread to the KIP. Removing a stream thread in an uncaught exception > handler doesn't seem a common use case to me. Nevertheless, we need to > make sure that we do not run in a deadlock in that case. I will consider > that during the implementation and write tests to check for deadlocks. > > Shutting down the Kafka Streams client from inside an uncaught exception > handler is outside the scope of this KIP. In the beginning it was part > of the KIP, but during the discussion it turned out that we can fix our > existing close() method to accomplish the shutdown from inside an > uncaught exception handler. But I completely agree with you that we need > to ensure that we do not run into a deadlock in this case. > > > > > > Guozhang > > > > > > On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax <mj...@apache.org> > wrote: > > > >> I would prefer to not add a new method. It seems unnecessary. > >> `localThreadMetadata` does return all threads in all states(*) and thus > >> provides full insight. > >> > >> (*) A thread in state DEAD could be returned as long as it's not removed > >> yet. > >> > >> I don't see any advantage to pre-filter threads and to exclude threads > >> in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not > >> started yet, it is still "alive" in a broader sense. For example, if a > >> user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in > >> state CREATED, a user won't need to add 2 more threads -- there are > >> already 10 threads. > >> > >> For PENDING_SHUTDOWN and scale in it would be different I guess, as the > >> proposal would be to filter them out right away. However, filtering them > >> seems actually not to be "correct", as a thread in PENDING_SHUTDOWN > >> might still process data and it's thus still "alive". > >> > >> If there is still a need later to add a new method about "alive thread" > >> we can always add as a follow up -- removing things is much harder. > >> > >> I also don't think that there is value in returning names of dead > >> threads, as we recycle names. > >> > >> > >> -Matthias > >> > >> > >> On 9/9/20 10:04 AM, Sophie Blee-Goldman wrote: > >>> I agree that the current behavior of localThreadsMetadata() does not > seem > >>> to match, but it seems like we will be forced to change it to only > return > >>> currently-alive threads. For one thing, we plan to recycle old thread > >> names. > >>> It would be pretty confusing for a user to get two (or more) > >> ThreadMetadata > >>> objects returned with the same name, since AFAICT this is the only > >>> distinguishing identifier of stream threads. I think we should enforce > >> that > >>> only live threads are returned by localThreadsMetadata(). Plus, as > >> Matthias > >>> pointed out, we plan to remove dead threads from the KafkaStreams > client, > >>> so still returning them in the metadata would be extremely odd. > >>> > >>> If we think that there might be some use case that requires knowing > which > >>> threads have died, we could consider adding a method that returns the > >>> names of dead threads. But the only use case I can imagine would > probably > >>> be better served by a callback that gets invoked when the thread dies, > >> which > >>> we already have. > >>> > >>> On Tue, Sep 8, 2020 at 11:46 PM Bruno Cadonna <br...@confluent.io> > >> wrote: > >>> > >>>> Hi Matthias and Sophie, > >>>> > >>>> I agree that localThreadsMetadata() can be used here. However, > >>>> localThreadsMetadata() returns all stream threads irrespectively of > >>>> their states. Alive stream threads are specified as being in one of > the > >>>> following states: RUNNING, STARTING, PARTITIONS_REVOKED, and > >>>> PARTITIONS_ASSIGNED. Hence, users would need to filter the result of > >>>> localThreadsMetadata(). I thought, it would be neat to have a method > >>>> that hides this filtering and returns the number of alive stream > >>>> threads, because that is the most basic information you might need to > >>>> decide about adding or removing stream threads. For all more advanced > >>>> use cases users should use localThreadsMetadata(). I am also happy > with > >>>> removing the method. WDYT? > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> On 09.09.20 03:51, Matthias J. Sax wrote: > >>>>> Currently we, don't cleanup dead threads, but the KIP proposes to > >> change > >>>>> this: > >>>>> > >>>>>> Stream threads that are in state DEAD will be removed from the > stream > >>>> threads of a Kafka Streams client. > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote: > >>>>>> Ah, I forgot about localThreadsMetadata(). In that. case I agree, > >>>> there's > >>>>>> no reason > >>>>>> to introduce a new method when we can get both the names and number > of > >>>> all > >>>>>> running threads from this. > >>>>>> > >>>>>> I assume that we would update localThreadsMetadata to only return > >>>> currently > >>>>>> alive threads as part of this KIP -- at a quick glance, it seems > like > >> we > >>>>>> don't do > >>>>>> any pruning of dead threads at the moment > >>>>>> > >>>>>> On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax <mj...@apache.org> > >>>> wrote: > >>>>>> > >>>>>>> I am not sure if we need a new method? There is already > >>>>>>> `localThreadsMetadata()`. What do we gain by adding a new one? > >>>>>>> > >>>>>>> Returning the thread's name (as `Optional<String>`) for both add() > >> and > >>>>>>> remove() is fine with me. > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote: > >>>>>>>> Sorry Bruno, I think I missed the end of your message with the > >>>>>>>> numberOfAliveStreamThreads() > >>>>>>>> proposal. I agree, that would be better than the alternatives I > >>>> listed. > >>>>>>>> That said: > >>>>>>>> > >>>>>>>>> They rather suggest that the method returns a list of handles to > >> the > >>>>>>>> stream threads. > >>>>>>>> > >>>>>>>> I hadn't thought of that originally, but now that you mention it, > >> this > >>>>>>>> might be a good idea. > >>>>>>>> I don't think we should return actual handles on the threads, but > >>>> maybe a > >>>>>>>> list of the thread > >>>>>>>> names rather than a single number of currently alive threads. > >>>>>>>> > >>>>>>>> Since we seem to think it would be difficult if not impossible to > >> keep > >>>>>>>> track of the number > >>>>>>>> of running stream threads, we should apply the same reasoning to > the > >>>>>>> names > >>>>>>>> and not > >>>>>>>> assume the user can/will keep track of every thread returned by > >>>>>>>> addStreamThread() or > >>>>>>>> removeStreamThread(). Users should generally take any required > >> action > >>>>>>>> immediately > >>>>>>>> after adding/removing the thread -- eg deregistering the thread > >>>> metrics > >>>>>>> -- > >>>>>>>> but it might > >>>>>>>> still be useful to provide a convenience method listing all of the > >>>>>>> current > >>>>>>>> threads > >>>>>>>> > >>>>>>>> And of course you could still get the number of threads easily by > >>>>>>> invoking > >>>>>>>> size() on the > >>>>>>>> returned list (or ordered set?). > >>>>>>>> > >>>>>>>> On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna <br...@confluent.io > > > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Thank you again for the feedback Sophie! > >>>>>>>>> > >>>>>>>>> As I tried to point out in my previous e-mail, removing a stream > >>>> thread > >>>>>>>>> from a Kafka Streams client that does not have alive stream > threads > >>>> is > >>>>>>>>> nothing exceptional for the client per se. However, it can become > >>>>>>>>> exceptional within the context of the user. For example, if users > >>>> want > >>>>>>>>> to remove a stream thread from a client without alive stream > >> threads > >>>>>>>>> because one if their metrics say so, then this is exceptional in > >> the > >>>>>>>>> context of that user metric not in the context of the Kafka > Streams > >>>>>>>>> client. In that case, users should throw an exception and handle > >> it. > >>>>>>>>> > >>>>>>>>> Regarding returning null, I do not like to return null because > >> from a > >>>>>>>>> development point of view there is no distinction between > returning > >>>> null > >>>>>>>>> because we have a bug in the code or returning null because there > >>>> are no > >>>>>>>>> alive stream threads. Additionally, Optional<String> makes it > more > >>>>>>>>> explicit that the result could also be empty. > >>>>>>>>> > >>>>>>>>> Thank you for the alternative method names! However, with the > names > >>>> you > >>>>>>>>> propose it is not immediately clear that the method returns an > >>>> amount of > >>>>>>>>> stream threads. They rather suggest that the method returns a > list > >> of > >>>>>>>>> handles to the stream threads. I chose to use > "aliveStreamThreads" > >>>> to be > >>>>>>>>> consistent with the client-level metric "alive-stream-threads" > >> which > >>>>>>>>> reports the same number of stream threads that > >>>>>>>>> numberOfAliveStreamThreads() should report. If others also think > >> that > >>>>>>>>> the proposed name in the KIP is too clumsy, I am open to rename > it, > >>>>>>> though. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Bruno > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 08.09.20 20:12, Sophie Blee-Goldman wrote: > >>>>>>>>>>> it's never a good sign when the discussion moves into the vote > >>>> thread > >>>>>>>>>> > >>>>>>>>>> Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] > >> threads > >>>>>>>>> strikes > >>>>>>>>>> again. > >>>>>>>>>> Thanks for redirecting me Bruno > >>>>>>>>>> > >>>>>>>>>> I suppose it's unfair to expect the callers to keep perfect > track > >> of > >>>>>>> the > >>>>>>>>>> current > >>>>>>>>>> number of stream threads, but it also seems like you > shouldn't > >> be > >>>>>>>>> calling > >>>>>>>>>> removeStreamThread() when there are no threads left. Either > you're > >>>> just > >>>>>>>>>> haphazardly removing threads and could unintentionally slip > into a > >>>>>>> state > >>>>>>>>> of > >>>>>>>>>> no > >>>>>>>>>> running threads without realizing it, or more realistically, > >> you're > >>>>>>>>>> carefully > >>>>>>>>>> removing threads based on some metric(s) that convey whether the > >>>> system > >>>>>>>>> is > >>>>>>>>>> over or under-provisioned. If your metrics say you're > >>>> over-provisioned > >>>>>>>>> but > >>>>>>>>>> there's > >>>>>>>>>> not one thread running, well, that certainly sounds exceptional > to > >>>> me. > >>>>>>> Or > >>>>>>>>>> you might > >>>>>>>>>> be right in that the cluster is over-provisioned but have just > >> been > >>>>>>>>>> directing the > >>>>>>>>>> removeStreamThread() and addStreamThread() calls to instances at > >>>>>>> random, > >>>>>>>>> and > >>>>>>>>>> end up with one massive instance and one with no threads at all. > >>>> Again, > >>>>>>>>>> this > >>>>>>>>>> probably merits some human intervention (or system redesign) > >>>>>>>>>> > >>>>>>>>>> That said, I don't think there's any real harm to just returning > >>>> null > >>>>>>> in > >>>>>>>>>> this case, but I hope > >>>>>>>>>> that users would pay attention to this since it seems likely to > >>>>>>> indicate > >>>>>>>>>> something has gone > >>>>>>>>>> seriously wrong. I suppose Optional<String> would be a > reasonable > >>>>>>>>>> compromise. > >>>>>>>>>> > >>>>>>>>>> As for the method name, what about activeStreamThreads() or > >>>>>>>>>> liveStreamThreads() ? > >>>>>>>>>> > >>>>>>>>>> On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna < > br...@confluent.io> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi John, > >>>>>>>>>>> > >>>>>>>>>>> I agree with you except for checking null. I would rather > prefer > >> to > >>>>>>> use > >>>>>>>>>>> Optional<String> as the return type to both methods. > >>>>>>>>>>> > >>>>>>>>>>> I changed the subject from [VOTE] to [DISCUSS] so that we can > >>>> follow > >>>>>>> up > >>>>>>>>>>> in the discussion thread. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Bruno > >>>>>>>>>>> > >>>>>>>>>>> On 04.09.20 23:12, John Roesler wrote: > >>>>>>>>>>>> Hi Sophie, > >>>>>>>>>>>> > >>>>>>>>>>>> Uh, oh, it's never a good sign when the discussion moves > >>>>>>>>>>>> into the vote thread :) > >>>>>>>>>>>> > >>>>>>>>>>>> I agree with you, it seems like a good touch for > >>>>>>>>>>>> removeStreamThread() to return the name of the thread that > >>>>>>>>>>>> got removed, rather than a boolean flag. Maybe the return > >>>>>>>>>>>> value would be `null` if there is no thread to remove. > >>>>>>>>>>>> > >>>>>>>>>>>> If we go that way, I'd suggest that addStreamThread() also > >>>>>>>>>>>> return the name of the newly created thread, or null if no > >>>>>>>>>>>> thread can be created right now. > >>>>>>>>>>>> > >>>>>>>>>>>> I'm not completely sure if I think that callers of this > >>>>>>>>>>>> method would know exactly how many threads there are. Sure, > >>>>>>>>>>>> if a human being is sitting there looking at the metrics or > >>>>>>>>>>>> logs and decides to call the method, it would work out, but > >>>>>>>>>>>> I'd expect this kind of method to find its way into > >>>>>>>>>>>> automated tooling that reacts to things like current system > >>>>>>>>>>>> load or resource saturation. Those kinds of toolchains often > >>>>>>>>>>>> are part of a distributed system, and it's probably not that > >>>>>>>>>>>> easy to guarantee that the thread count they observe is > >>>>>>>>>>>> fully consistent with the number of threads that are > >>>>>>>>>>>> actually running. Therefore, an in-situ `int > >>>>>>>>>>>> numStreamThreads()` method might not be a bad idea. Then > >>>>>>>>>>>> again, it seems sort of optional. A caller can catch an > >>>>>>>>>>>> exception or react to a `null` return value just the same > >>>>>>>>>>>> either way. Having both add/remove methods behave similarly > >>>>>>>>>>>> is probably more valuable. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> -John > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> Hey, sorry for the late reply, I just have one minor > >> suggestion. > >>>>>>> Since > >>>>>>>>>>> we > >>>>>>>>>>>>> don't > >>>>>>>>>>>>> make any guarantees about which thread gets removed or allow > >> the > >>>>>>> user > >>>>>>>>> to > >>>>>>>>>>>>> specify, I think we should return either the index or full > name > >>>> of > >>>>>>> the > >>>>>>>>>>>>> thread > >>>>>>>>>>>>> that does get removed by removeThread(). > >>>>>>>>>>>>> > >>>>>>>>>>>>> I know you just updated the KIP to return true/false if there > >>>>>>>>>>> are/aren't any > >>>>>>>>>>>>> threads to be removed, but I think this would be more > >>>> appropriate as > >>>>>>>>> an > >>>>>>>>>>>>> exception than as a return type. I think it's reasonable to > >>>> expect > >>>>>>>>>>> users to > >>>>>>>>>>>>> have some sense to how many threads are remaining, and not > try > >> to > >>>>>>>>> remove > >>>>>>>>>>>>> a thread when there is none left. To me, that indicates > >> something > >>>>>>>>> wrong > >>>>>>>>>>>>> with the user application code and should be treated as an > >>>>>>> exceptional > >>>>>>>>>>> case. > >>>>>>>>>>>>> I don't think the same code clarify argument applies here as > to > >>>> the > >>>>>>>>>>>>> addStreamThread() case, as there's no reason for an > application > >>>> to > >>>>>>> be > >>>>>>>>>>>>> looping and retrying removeStreamThread() since if that > fails, > >>>> it's > >>>>>>>>>>> because > >>>>>>>>>>>>> there are no threads left and thus it will continue to always > >>>> fail. > >>>>>>>>> And > >>>>>>>>>>> if > >>>>>>>>>>>>> the > >>>>>>>>>>>>> user actually wants to shut down all threads, they should > just > >>>> close > >>>>>>>>> the > >>>>>>>>>>>>> whole application rather than call removeStreamThread() in a > >>>> loop. > >>>>>>>>>>>>> > >>>>>>>>>>>>> While I generally think it should be straightforward for > users > >> to > >>>>>>>>> track > >>>>>>>>>>> how > >>>>>>>>>>>>> many stream threads they have running, maybe it would be nice > >> to > >>>> add > >>>>>>>>>>>>> a small utility method that does this for them. Something > like > >>>>>>>>>>>>> > >>>>>>>>>>>>> // Returns the number of currently alive threads > >>>>>>>>>>>>> boolean runningStreamThreads(); > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Thu, Sep 3, 2020 at 7:41 AM Matthias J. Sax < > >> mj...@apache.org > >>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> +1 (binding) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 9/3/20 6:16 AM, Bruno Cadonna wrote: > >>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I would like to start the voting on KIP-663 that proposes > to > >>>> add > >>>>>>>>>>> methods > >>>>>>>>>>>>>>> to the Kafka Streams client to add and remove stream > threads > >>>>>>> during > >>>>>>>>>>>>>>> execution. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>> Bruno > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >> > > > -- -- Guozhang