Thanks Seung-chan!

Re: "As I read the code, we can set a consumer to leave the group while
shutting
down the thread, using `StreamThread#requestLeaveGroupDuringShutdown`
method. Is it enough to call that method on every thread created in
`KafkaStream` sometime before we call `StreamThread#shutdown`?"

I think you are right. We can bypass API changes in consumers for this KIP.

Re: "I updated the `Public Interfaces` section in the KIP to specify full
signature. As you can see, I suggested putting the `CloseOptions` class in
the `KafkaStream` class. I feel like it'd be too general name to put it in
a separate file."

Re-read the updated KIP, looks good to me!


Guozhang

On Fri, Feb 11, 2022 at 11:21 PM Seung-chan Ahn <dev.issea1...@gmail.com>
wrote:

> (re-sending with the better syntax of quotation)
>
> Hello Guozhang,
>
> Thanks for your rich comment! I'd carefully read through all you mentioned.
> I updated the `Public Interfaces` section of KIP and this is what I think:
>
> @Guozhang: Today we use the timeout to try to tackle all three cases, but
> > ideally we want the client to submit extra information to help
> distinguish
> > them. I.e. we just use timeout for case 1) only, while we use separate
> > mechanisms to differentiate 2) and 3) from it. Personally I think we
> could
> > consider having an augmented leave-group (or maybe in the long run, we
> can
> > merge that RPC as part of heartbeat) with a flag indicating 2) or 3),
> while
> > just relying on the timeout for case 1).
>
>
> I know you proposed this idea in a wider scope than this KIP, but it'd be
> worth keeping the discussion. I've thought about the idea of `augmented
> leave-group with a flag indicating 2) or 3)`. In the case that a bouncing
> consumer requested, with a flag, to leave the group, and unfortunately, it
> failed to restart, I guess the group’s coordinator still needs to drop the
> consumer after some while. And by its nature, the coordinator would wait
> for the consumer till the timeout reached. Eventually, it seems like not
> really different from the case the consumer restarts and prays the timeout
> is enough. In my very naive thought, `augmented leave-group with a flag
> indicating 2)` is not supposed to be a request to leave the group but the
> one for being exempted from the timeout. So I’d rather consider having a
> request to extend the timeout for one time instead.
>
> @Guozhang: 1. Regarding the API change, I feel just doing that on the
> > streams side is not enough since by the end of the day we still need the
> > consumer to incorporate it (today it's via a static config and hence we
> > cannot just dynamically change the config).
>
>
> As I read the code, we can set a consumer to leave the group while shutting
> down the thread, using `StreamThread#requestLeaveGroupDuringShutdown`
> method. Is it enough to call that method on every thread created in
> `KafkaStream` sometime before we call `StreamThread#shutdown`?
>
> @Guozhang: 2. Regarding the API itself, I want to see a more concrete
> > proposal that contains the full signature, e.g.
> does".closeAndLeaveGroup()"
> > include the timeout param as well, etc? My very subjective preference is
> to
> > not differentiate by the function name in case we may want to augment the
> > close function in the future, which would explode the function names :P
> > Instead maybe we can just overload the `close()` function again but with
> a
> > control object, that includes 1) timeout, 2) leave-group flag, and hence
> > can also extend to include more variables just in case. Would like to
> hear
> > others' thoughts as well.
>
>
> I personally prefer the control object pattern too. It will save us from
> the "telescoping constructors" pattern. Also, I found that we already
> introduced this way on `AdminClient`. It sounds consistent to have the same
> pattern in this case.
>
> I updated the `Public Interfaces` section in the KIP to specify full
> signature. As you can see, I suggested putting the `CloseOptions` class in
> the `KafkaStream` class. I feel like it'd be too general name to put it in
> a separate file.
>
> I’m fully open :) Feel free to oppose any.
>
> On Mon, Feb 7, 2022 at 12:50 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Seung-chan,
> >
> > Thanks for the KIP writeup and summary! I made a pass on it and want to
> > share some of my thoughts:
> >
> > On the very high level, we want to be able to effectively differentiate
> > several cases as follows:
> >
> > 1) There's a network partition / soft failure hence clients cannot reach
> > the broker, temporarily: here we want to give some time to see if the
> > clients can reconnect back, and hence the timeout makes sense.
> > 2) The client is being bounced, i.e. a shutdown followed by a restart:
> here
> > we do not want to trigger any rebalance, but today we can only hope that
> > the timeout is long enough to cover that bounce window.
> > 3) The client is shutdown and won't be back: here we want to trigger the
> > rebalance immediately, but today we'd have to wait for the timeout value.
> >
> > Today we use the timeout to try to tackle all three cases, but ideally we
> > want the client to submit extra information to help distinguish them.
> I.e.
> > we just use timeout for case 1) only, while we use separate mechanisms to
> > differentiate 2) and 3) from it. Personally I think we could consider
> > having an augmented leave-group (or maybe in the long run, we can merge
> > that RPC as part of heartbeat) with a flag indicating 2) or 3), while
> just
> > relying on the timeout for case 1).
> >
> > But to consider a narrower scope for this KIP that does not touch on
> > protocol changes, I think just differentiate 2/3) by not sending
> > leave-group for 2) v.s. sending leave-group for 3) is sufficient.
> >
> > As for the KIP itself, I have a few minor comments:
> >
> > 1. Regarding the API change, I feel just doing that on the streams side
> is
> > not enough since by the end of the day we still need the consumer to
> > incorporate it (today it's via a static config and hence we cannot just
> > dynamically change the config).
> >
> > 2. Regarding the API itself, I want to see a more concrete proposal that
> > contains the full signature, e.g. does".closeAndLeaveGroup()" include the
> > timeout param as well, etc? My very subjective preference is to not
> > differentiate by the function name in case we may want to augment the
> close
> > function in the future, which would explode the function names :P Instead
> > maybe we can just overload the `close()` function again but with a
> control
> > object, that includes 1) timeout, 2) leave-group flag, and hence can also
> > extend to include more variables just in case. Would like to hear others'
> > thoughts as well.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jan 12, 2022 at 5:51 AM Seung-chan Ahn <dev.issea1...@gmail.com>
> > wrote:
> >
> > > Hi team,
> > >
> > > Here's the new KIP
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group
> > > >
> > >  for this issue <https://issues.apache.org/jira/browse/KAFKA-13217>.
> > >
> > > The title says pretty much what this KIP is for. Even though it's my
> > first
> > > draft, as A. Sophie Blee-Goldman has written rich descriptions and
> > already
> > > the solutions in the issue thread, I've enjoyed following up on the
> idea.
> > >
> > > Please feel free to review on any point!
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to