Thanks Bruno, your replies make sense to me. As for
localThreadsMetadata() itself,
I'd like to clarify if it would return any still-bookkept threads, or would
it specifically filter out those DEAD threads even if they are not yet
removed.

Otherwise, the KIP LGTM.

Guozhang

On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Guozhang,
>
> Thank you for your feedback. I replied inline.
>
> Best,
> Bruno
>
> On 09.09.20 23:43, Guozhang Wang wrote:
> > Hello Bruno,
> >
> > Finally got some time to review your KIP and the discussion thread now..
> a
> > few comments below:
> >
> > 1) I'm with Matthias about the newly added numberOfAliveStreamThreads
> v.s.
> > existing localThreadsMetadata: to me it seems we can always achieve the
> > first based on the second. It seems not worthy to provide some "syntax
> > sugar" to the API but just let users do the filtering themselves.
>
> I am not married to that method. I removed it.
>
> > Furthermore, I'm wondering what's the rationale behind removing the DEAD
> > threads from localThreadsMetadata()? Personally I feel returning all
> > threads, including those who are ever closed, either due to exception or
> > due to removeStreamThread, would be beneficial for debugging purposes, as
> > long as within the lifetime of an instance we expect the amount of such
> > dead threads will not increase linearly --- and if we agree with that,
> > maybe we can rename "removeStreamThread" to sth. like
> > "terminateStreamThread" indicating it is only terminated but not removed
> > --- and of course if users do not want to see those DEAD threads they can
> > always filter them out. I'm just proposing that we should still leave the
> > door open for those who want to check those ever terminated threads.
> >
>
> I actually think the number of dead stream threads might increase
> linearly. Assume users have a systematic error that continuously kills a
> stream thread and they blindly start a new stream thread in the uncaught
> exception handler. This scenario might be a mistake but if the
> systematic error does not occur at a high rate, it could also be a
> strategy to keep the application running during the investigation of the
> systematic error.
>
> IMO, removing dead stream threads makes Kafka Streams more robust
> because it prevent a possibly unbounded increase of memory usage. If
> users want to debug the dead stream threads they can monitor the number
> of dead threads with the metric proposed in the KIP and they could
> additionally log the metadata of the dying stream thread in the uncaught
> exception handler. I do not think that there is need to keep dead stream
> threads around.
>
> > 2) I think it would help to write down some example user code in
> exception
> > handler e.g. to illustrate how this would be implemented -- e.g. we know
> > that practically the handler need to maintain a "this" reference of the
> > instance anyways in order to shutdown the whole instance or,
> add/terminate
> > threads dynamically, but I want to see if we have listed all possible
> call
> > paths like: a) a thread's handler logic to terminate another thread, b) a
> > thread handler to add new threads, etc are all appropriately supported
> > without deadlocks.
> >
>
> I added an example for an uncaught exception handler that adds a stream
> thread to the KIP. Removing a stream thread in an uncaught exception
> handler doesn't seem a common use case to me. Nevertheless, we need to
> make sure that we do not run in a deadlock in that case. I will consider
> that during the implementation and write tests to check for deadlocks.
>
> Shutting down the Kafka Streams client from inside an uncaught exception
> handler is outside the scope of this KIP. In the beginning it was part
> of the KIP, but during the discussion it turned out that we can fix our
> existing close() method to accomplish the shutdown from inside an
> uncaught exception handler. But I completely agree with you that we need
> to ensure that we do not run into a deadlock in this case.
>
>
> >
> > Guozhang
> >
> >
> > On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> I would prefer to not add a new method. It seems unnecessary.
> >> `localThreadMetadata` does return all threads in all states(*) and thus
> >> provides full insight.
> >>
> >> (*) A thread in state DEAD could be returned as long as it's not removed
> >> yet.
> >>
> >> I don't see any advantage to pre-filter threads and to exclude threads
> >> in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not
> >> started yet, it is still "alive" in a broader sense. For example, if a
> >> user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in
> >> state CREATED, a user won't need to add 2 more threads -- there are
> >> already 10 threads.
> >>
> >> For PENDING_SHUTDOWN and scale in it would be different I guess, as the
> >> proposal would be to filter them out right away. However, filtering them
> >> seems actually not to be "correct", as a thread in PENDING_SHUTDOWN
> >> might still process data and it's thus still "alive".
> >>
> >> If there is still a need later to add a new method about "alive thread"
> >> we can always add as a follow up -- removing things is much harder.
> >>
> >> I also don't think that there is value in returning names of dead
> >> threads, as we recycle names.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 9/9/20 10:04 AM, Sophie Blee-Goldman wrote:
> >>> I agree that the current behavior of localThreadsMetadata() does not
> seem
> >>> to match, but it seems like we will be forced to change it to only
> return
> >>> currently-alive threads. For one thing, we plan to recycle old thread
> >> names.
> >>> It would be pretty confusing for a user to get two (or more)
> >> ThreadMetadata
> >>> objects returned with the same name, since AFAICT this is the only
> >>> distinguishing identifier of stream threads. I think we should enforce
> >> that
> >>> only live threads are returned by localThreadsMetadata(). Plus, as
> >> Matthias
> >>> pointed out, we plan to remove dead threads from the KafkaStreams
> client,
> >>> so still returning them in the metadata would be extremely odd.
> >>>
> >>> If we think that there might be some use case that requires knowing
> which
> >>> threads have died, we could consider adding a method that returns the
> >>> names of dead threads. But the only use case I can imagine would
> probably
> >>> be better served by a callback that gets invoked when the thread dies,
> >> which
> >>> we already have.
> >>>
> >>> On Tue, Sep 8, 2020 at 11:46 PM Bruno Cadonna <br...@confluent.io>
> >> wrote:
> >>>
> >>>> Hi Matthias and Sophie,
> >>>>
> >>>> I agree that localThreadsMetadata() can be used here. However,
> >>>> localThreadsMetadata() returns all stream threads irrespectively of
> >>>> their states. Alive stream threads are specified as being in one of
> the
> >>>> following states: RUNNING, STARTING, PARTITIONS_REVOKED, and
> >>>> PARTITIONS_ASSIGNED. Hence, users would need to filter the result of
> >>>> localThreadsMetadata(). I thought, it would be neat to have a method
> >>>> that hides this filtering and returns the number of alive stream
> >>>> threads, because that is the most basic information you might need to
> >>>> decide about adding or removing stream threads. For all more advanced
> >>>> use cases users should use localThreadsMetadata(). I am also happy
> with
> >>>> removing the method. WDYT?
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 09.09.20 03:51, Matthias J. Sax wrote:
> >>>>> Currently we, don't cleanup dead threads, but the KIP proposes to
> >> change
> >>>>> this:
> >>>>>
> >>>>>> Stream threads that are in state DEAD will be removed from the
> stream
> >>>> threads of a Kafka Streams client.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote:
> >>>>>> Ah, I forgot about localThreadsMetadata(). In that. case I agree,
> >>>> there's
> >>>>>> no reason
> >>>>>> to introduce a new method when we can get both the names and number
> of
> >>>> all
> >>>>>> running threads from this.
> >>>>>>
> >>>>>> I assume that we would update localThreadsMetadata to only return
> >>>> currently
> >>>>>> alive threads as part of this KIP -- at a quick glance, it seems
> like
> >> we
> >>>>>> don't do
> >>>>>> any pruning of dead threads at the moment
> >>>>>>
> >>>>>> On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax <mj...@apache.org>
> >>>> wrote:
> >>>>>>
> >>>>>>> I am not sure if we need a new method? There is already
> >>>>>>> `localThreadsMetadata()`. What do we gain by adding a new one?
> >>>>>>>
> >>>>>>> Returning the thread's name (as `Optional<String>`) for both add()
> >> and
> >>>>>>> remove() is fine with me.
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote:
> >>>>>>>> Sorry Bruno, I think I missed the end of your message with the
> >>>>>>>> numberOfAliveStreamThreads()
> >>>>>>>> proposal. I agree, that would be better than the alternatives I
> >>>> listed.
> >>>>>>>> That said:
> >>>>>>>>
> >>>>>>>>> They rather suggest that the method returns a list of handles to
> >> the
> >>>>>>>> stream threads.
> >>>>>>>>
> >>>>>>>> I hadn't thought of that originally, but now that you mention it,
> >> this
> >>>>>>>> might be a good idea.
> >>>>>>>> I don't think we should return actual handles on the threads, but
> >>>> maybe a
> >>>>>>>> list of the thread
> >>>>>>>> names rather than a single number of currently alive threads.
> >>>>>>>>
> >>>>>>>> Since we seem to think it would be difficult if not impossible to
> >> keep
> >>>>>>>> track of the number
> >>>>>>>> of running stream threads, we should apply the same reasoning to
> the
> >>>>>>> names
> >>>>>>>> and not
> >>>>>>>> assume the user can/will keep track of every thread returned by
> >>>>>>>> addStreamThread() or
> >>>>>>>> removeStreamThread(). Users should generally take any required
> >> action
> >>>>>>>> immediately
> >>>>>>>> after adding/removing the thread -- eg deregistering the thread
> >>>> metrics
> >>>>>>> --
> >>>>>>>> but it might
> >>>>>>>> still be useful to provide a convenience method listing all of the
> >>>>>>> current
> >>>>>>>> threads
> >>>>>>>>
> >>>>>>>> And of course you could still get the number of threads easily by
> >>>>>>> invoking
> >>>>>>>> size() on the
> >>>>>>>> returned list (or ordered set?).
> >>>>>>>>
> >>>>>>>> On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna <br...@confluent.io
> >
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thank you again for the feedback Sophie!
> >>>>>>>>>
> >>>>>>>>> As I tried to point out in my previous e-mail, removing a stream
> >>>> thread
> >>>>>>>>> from a Kafka Streams client that does not have alive stream
> threads
> >>>> is
> >>>>>>>>> nothing exceptional for the client per se. However, it can become
> >>>>>>>>> exceptional within the context of the user. For example, if users
> >>>> want
> >>>>>>>>> to remove a stream thread from a client without alive stream
> >> threads
> >>>>>>>>> because one if their metrics say so, then this is exceptional in
> >> the
> >>>>>>>>> context of that user metric not in the context of the Kafka
> Streams
> >>>>>>>>> client. In that case, users should throw an exception and handle
> >> it.
> >>>>>>>>>
> >>>>>>>>> Regarding returning null, I do not like to return null because
> >> from a
> >>>>>>>>> development point of view there is no distinction between
> returning
> >>>> null
> >>>>>>>>> because we have a bug in the code or returning null because there
> >>>> are no
> >>>>>>>>> alive stream threads. Additionally, Optional<String> makes it
> more
> >>>>>>>>> explicit that the result could also be empty.
> >>>>>>>>>
> >>>>>>>>> Thank you for the alternative method names! However, with the
> names
> >>>> you
> >>>>>>>>> propose it is not immediately clear that the method returns an
> >>>> amount of
> >>>>>>>>> stream threads. They rather suggest that the method returns a
> list
> >> of
> >>>>>>>>> handles to the stream threads. I chose to use
> "aliveStreamThreads"
> >>>> to be
> >>>>>>>>> consistent with the client-level metric "alive-stream-threads"
> >> which
> >>>>>>>>> reports the same number of stream threads that
> >>>>>>>>> numberOfAliveStreamThreads() should report. If others also think
> >> that
> >>>>>>>>> the proposed name in the KIP is too clumsy, I am open to rename
> it,
> >>>>>>> though.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Bruno
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 08.09.20 20:12, Sophie Blee-Goldman wrote:
> >>>>>>>>>>> it's never a good sign when the discussion moves into the vote
> >>>> thread
> >>>>>>>>>>
> >>>>>>>>>> Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS]
> >> threads
> >>>>>>>>> strikes
> >>>>>>>>>> again.
> >>>>>>>>>> Thanks for redirecting me Bruno
> >>>>>>>>>>
> >>>>>>>>>> I suppose it's unfair to expect the callers to keep perfect
> track
> >> of
> >>>>>>> the
> >>>>>>>>>> current
> >>>>>>>>>>     number of stream threads, but it also seems like you
> shouldn't
> >> be
> >>>>>>>>> calling
> >>>>>>>>>> removeStreamThread() when there are no threads left. Either
> you're
> >>>> just
> >>>>>>>>>> haphazardly removing threads and could unintentionally slip
> into a
> >>>>>>> state
> >>>>>>>>> of
> >>>>>>>>>> no
> >>>>>>>>>> running threads without realizing it, or more realistically,
> >> you're
> >>>>>>>>>> carefully
> >>>>>>>>>> removing threads based on some metric(s) that convey whether the
> >>>> system
> >>>>>>>>> is
> >>>>>>>>>> over or under-provisioned. If your metrics say you're
> >>>> over-provisioned
> >>>>>>>>> but
> >>>>>>>>>> there's
> >>>>>>>>>> not one thread running, well, that certainly sounds exceptional
> to
> >>>> me.
> >>>>>>> Or
> >>>>>>>>>> you might
> >>>>>>>>>> be right in that the cluster is over-provisioned but have just
> >> been
> >>>>>>>>>> directing the
> >>>>>>>>>> removeStreamThread() and addStreamThread() calls to instances at
> >>>>>>> random,
> >>>>>>>>> and
> >>>>>>>>>> end up with one massive instance and one with no threads at all.
> >>>> Again,
> >>>>>>>>>> this
> >>>>>>>>>> probably merits some human intervention (or system redesign)
> >>>>>>>>>>
> >>>>>>>>>> That said, I don't think there's any real harm to just returning
> >>>> null
> >>>>>>> in
> >>>>>>>>>> this case, but I hope
> >>>>>>>>>> that users would pay attention to this since it seems likely to
> >>>>>>> indicate
> >>>>>>>>>> something has gone
> >>>>>>>>>> seriously wrong. I suppose Optional<String> would be a
> reasonable
> >>>>>>>>>> compromise.
> >>>>>>>>>>
> >>>>>>>>>> As for the method name, what about activeStreamThreads() or
> >>>>>>>>>> liveStreamThreads() ?
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna <
> br...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi John,
> >>>>>>>>>>>
> >>>>>>>>>>> I agree with you except for checking null. I would rather
> prefer
> >> to
> >>>>>>> use
> >>>>>>>>>>> Optional<String> as the return type to both methods.
> >>>>>>>>>>>
> >>>>>>>>>>> I changed the subject from [VOTE] to [DISCUSS] so that we can
> >>>> follow
> >>>>>>> up
> >>>>>>>>>>> in the discussion thread.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Bruno
> >>>>>>>>>>>
> >>>>>>>>>>> On 04.09.20 23:12, John Roesler wrote:
> >>>>>>>>>>>> Hi Sophie,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Uh, oh, it's never a good sign when the discussion moves
> >>>>>>>>>>>> into the vote thread :)
> >>>>>>>>>>>>
> >>>>>>>>>>>> I agree with you, it seems like a good touch for
> >>>>>>>>>>>> removeStreamThread() to return the name of the thread that
> >>>>>>>>>>>> got removed, rather than a boolean flag. Maybe the return
> >>>>>>>>>>>> value would be `null` if there is no thread to remove.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If we go that way, I'd suggest that addStreamThread() also
> >>>>>>>>>>>> return the name of the newly created thread, or null if no
> >>>>>>>>>>>> thread can be created right now.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm not completely sure if I think that callers of this
> >>>>>>>>>>>> method would know exactly how many threads there are. Sure,
> >>>>>>>>>>>> if a human being is sitting there looking at the metrics or
> >>>>>>>>>>>> logs and decides to call the method, it would work out, but
> >>>>>>>>>>>> I'd expect this kind of method to find its way into
> >>>>>>>>>>>> automated tooling that reacts to things like current system
> >>>>>>>>>>>> load or resource saturation. Those kinds of toolchains often
> >>>>>>>>>>>> are part of a distributed system, and it's probably not that
> >>>>>>>>>>>> easy to guarantee that the thread count they observe is
> >>>>>>>>>>>> fully consistent with the number of threads that are
> >>>>>>>>>>>> actually running. Therefore, an in-situ `int
> >>>>>>>>>>>> numStreamThreads()` method might not be a bad idea. Then
> >>>>>>>>>>>> again, it seems sort of optional. A caller can catch an
> >>>>>>>>>>>> exception or react to a `null` return value just the same
> >>>>>>>>>>>> either way. Having both add/remove methods behave similarly
> >>>>>>>>>>>> is probably more valuable.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> -John
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>> Hey, sorry for the late reply, I just have one minor
> >> suggestion.
> >>>>>>> Since
> >>>>>>>>>>> we
> >>>>>>>>>>>>> don't
> >>>>>>>>>>>>> make any guarantees about which thread gets removed or allow
> >> the
> >>>>>>> user
> >>>>>>>>> to
> >>>>>>>>>>>>> specify, I think we should return either the index or full
> name
> >>>> of
> >>>>>>> the
> >>>>>>>>>>>>> thread
> >>>>>>>>>>>>> that does get removed by removeThread().
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I know you just updated the KIP to return true/false if there
> >>>>>>>>>>> are/aren't any
> >>>>>>>>>>>>> threads to be removed, but I think this would be more
> >>>> appropriate as
> >>>>>>>>> an
> >>>>>>>>>>>>> exception than as a return type. I think it's reasonable to
> >>>> expect
> >>>>>>>>>>> users to
> >>>>>>>>>>>>> have some sense to how many threads are remaining, and not
> try
> >> to
> >>>>>>>>> remove
> >>>>>>>>>>>>> a thread when there is none left. To me, that indicates
> >> something
> >>>>>>>>> wrong
> >>>>>>>>>>>>> with the user application code and should be treated as an
> >>>>>>> exceptional
> >>>>>>>>>>> case.
> >>>>>>>>>>>>> I don't think the same code clarify argument applies here as
> to
> >>>> the
> >>>>>>>>>>>>> addStreamThread() case, as there's no reason for an
> application
> >>>> to
> >>>>>>> be
> >>>>>>>>>>>>> looping and retrying removeStreamThread()  since if that
> fails,
> >>>> it's
> >>>>>>>>>>> because
> >>>>>>>>>>>>> there are no threads left and thus it will continue to always
> >>>> fail.
> >>>>>>>>> And
> >>>>>>>>>>> if
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> user actually wants to shut down all threads, they should
> just
> >>>> close
> >>>>>>>>> the
> >>>>>>>>>>>>> whole application rather than call removeStreamThread() in a
> >>>> loop.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> While I generally think it should be straightforward for
> users
> >> to
> >>>>>>>>> track
> >>>>>>>>>>> how
> >>>>>>>>>>>>> many stream threads they have running, maybe it would be nice
> >> to
> >>>> add
> >>>>>>>>>>>>> a small utility method that does this for them. Something
> like
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> // Returns the number of currently alive threads
> >>>>>>>>>>>>> boolean runningStreamThreads();
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, Sep 3, 2020 at 7:41 AM Matthias J. Sax <
> >> mj...@apache.org
> >>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> +1 (binding)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 9/3/20 6:16 AM, Bruno Cadonna wrote:
> >>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I would like to start the voting on KIP-663 that proposes
> to
> >>>> add
> >>>>>>>>>>> methods
> >>>>>>>>>>>>>>> to the Kafka Streams client to add and remove stream
> threads
> >>>>>>> during
> >>>>>>>>>>>>>>> execution.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>


-- 
-- Guozhang

Reply via email to