Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-19 Thread Luke Chen
Hi Seung-chan,

Thanks for the update.
Making the default value to `Long.MAX_VALUE` of duration for consistency
makes sense to me.

I have no more comments.

Thank you.
Luke

On Sat, Feb 19, 2022 at 5:12 PM Seung-chan Ahn 
wrote:

> Hi Luke,
>
> Thanks for the comment.
>
> 3. In that case, I assumed we'd make it behave in the same way of
> `close()`, and that eventually means the `close(Long.MAX_VALUE)`. It'd be
> better to specify the default value in the `CloseOptions` class. Let me
> update the KIP -> Done
>
> Thank you.
> Seung-chan
>
> On Sat, Feb 19, 2022 at 4:42 PM Luke Chen  wrote:
>
> > Hi Seung-chan,
> >
> > Thanks for the update.
> >
> > One more comment:
> > 3. Could you explain if user doesn't provide the `timeout` value in
> > `CloseOptions`. What will happen? I saw the default value for `timeout`
> > variable in `CloseOptions` is `null`. What doest null `timeout` mean?
> >
> > Thank you.
> > Luke
> >
> > On Sat, Feb 19, 2022 at 3:31 PM Seung-chan Ahn 
> > wrote:
> >
> > > Hi Luke,
> > >
> > > Thanks for reviewing this KIP!
> > >
> > > 1. You are right, I'm only adding one method with `CloseOptions`. Let
> me
> > > comment in the KIP! -> Done
> > > 2. I think it makes more sense to have `Duration` type option which is
> > > aligned to the existing public method. Let me update the KIP! -> Done
> > >
> > > Seung-chan
> > >
> > > On Fri, Feb 18, 2022 at 3:21 PM Luke Chen  wrote:
> > >
> > > > Hi Seung-chan,
> > > >
> > > > Thanks for the KIP!
> > > > I like the optimization to have a close method to force leave group.
> > > > Some minor comments:
> > > > 1. in the "public interfaces", there are many `close` method listed,
> > but
> > > I
> > > > believe you only want to introduce the one with `CloseOptions`,
> right?
> > > > Could you add some comment to it to point it out this is what you
> added
> > > in
> > > > this KIP?
> > > > 2. In the CloseOptions class variable, I saw there's a `timeoutMs` as
> > > type
> > > > `Integer`. Could you explain why we use `Integer` here, instead of
> > `int`,
> > > > or `Duration`?
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Tue, Feb 15, 2022 at 11:19 PM Seung-chan Ahn <
> > dev.issea1...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > >
> > > > > Thanks a lot for sharing your idea and guiding me.
> > > > >
> > > > >
> > > > > Thanks to your support, we've reached this voting:
> > > > >
> > > > > https://lists.apache.org/thread/toq5pg799ctd7lwdcd6g7zk6xn73h26r
> > > > >
> > > > >
> > > > > Seung-chan
> > > > >
> > > > > On Wed, Jan 12, 2022 at 10:50 PM Seung-chan Ahn <
> > > dev.issea1...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi team,
> > > > > >
> > > > > > Here's the new KIP
> > > > > > <
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group
> > > > > >
> > > > > >  for this issue <
> https://issues.apache.org/jira/browse/KAFKA-13217
> > >.
> > > > > >
> > > > > > The title says pretty much what this KIP is for. Even though it's
> > my
> > > > > first
> > > > > > draft, as A. Sophie Blee-Goldman has written rich descriptions
> and
> > > > > already
> > > > > > the solutions in the issue thread, I've enjoyed following up on
> the
> > > > idea.
> > > > > >
> > > > > > Please feel free to review on any point!
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-19 Thread Seung-chan Ahn
Hi Luke,

Thanks for the comment.

3. In that case, I assumed we'd make it behave in the same way of
`close()`, and that eventually means the `close(Long.MAX_VALUE)`. It'd be
better to specify the default value in the `CloseOptions` class. Let me
update the KIP -> Done

Thank you.
Seung-chan

On Sat, Feb 19, 2022 at 4:42 PM Luke Chen  wrote:

> Hi Seung-chan,
>
> Thanks for the update.
>
> One more comment:
> 3. Could you explain if user doesn't provide the `timeout` value in
> `CloseOptions`. What will happen? I saw the default value for `timeout`
> variable in `CloseOptions` is `null`. What doest null `timeout` mean?
>
> Thank you.
> Luke
>
> On Sat, Feb 19, 2022 at 3:31 PM Seung-chan Ahn 
> wrote:
>
> > Hi Luke,
> >
> > Thanks for reviewing this KIP!
> >
> > 1. You are right, I'm only adding one method with `CloseOptions`. Let me
> > comment in the KIP! -> Done
> > 2. I think it makes more sense to have `Duration` type option which is
> > aligned to the existing public method. Let me update the KIP! -> Done
> >
> > Seung-chan
> >
> > On Fri, Feb 18, 2022 at 3:21 PM Luke Chen  wrote:
> >
> > > Hi Seung-chan,
> > >
> > > Thanks for the KIP!
> > > I like the optimization to have a close method to force leave group.
> > > Some minor comments:
> > > 1. in the "public interfaces", there are many `close` method listed,
> but
> > I
> > > believe you only want to introduce the one with `CloseOptions`, right?
> > > Could you add some comment to it to point it out this is what you added
> > in
> > > this KIP?
> > > 2. In the CloseOptions class variable, I saw there's a `timeoutMs` as
> > type
> > > `Integer`. Could you explain why we use `Integer` here, instead of
> `int`,
> > > or `Duration`?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Tue, Feb 15, 2022 at 11:19 PM Seung-chan Ahn <
> dev.issea1...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > >
> > > > Thanks a lot for sharing your idea and guiding me.
> > > >
> > > >
> > > > Thanks to your support, we've reached this voting:
> > > >
> > > > https://lists.apache.org/thread/toq5pg799ctd7lwdcd6g7zk6xn73h26r
> > > >
> > > >
> > > > Seung-chan
> > > >
> > > > On Wed, Jan 12, 2022 at 10:50 PM Seung-chan Ahn <
> > dev.issea1...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi team,
> > > > >
> > > > > Here's the new KIP
> > > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group
> > > > >
> > > > >  for this issue  >.
> > > > >
> > > > > The title says pretty much what this KIP is for. Even though it's
> my
> > > > first
> > > > > draft, as A. Sophie Blee-Goldman has written rich descriptions and
> > > > already
> > > > > the solutions in the issue thread, I've enjoyed following up on the
> > > idea.
> > > > >
> > > > > Please feel free to review on any point!
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-18 Thread Luke Chen
Hi Seung-chan,

Thanks for the update.

One more comment:
3. Could you explain if user doesn't provide the `timeout` value in
`CloseOptions`. What will happen? I saw the default value for `timeout`
variable in `CloseOptions` is `null`. What doest null `timeout` mean?

Thank you.
Luke

On Sat, Feb 19, 2022 at 3:31 PM Seung-chan Ahn 
wrote:

> Hi Luke,
>
> Thanks for reviewing this KIP!
>
> 1. You are right, I'm only adding one method with `CloseOptions`. Let me
> comment in the KIP! -> Done
> 2. I think it makes more sense to have `Duration` type option which is
> aligned to the existing public method. Let me update the KIP! -> Done
>
> Seung-chan
>
> On Fri, Feb 18, 2022 at 3:21 PM Luke Chen  wrote:
>
> > Hi Seung-chan,
> >
> > Thanks for the KIP!
> > I like the optimization to have a close method to force leave group.
> > Some minor comments:
> > 1. in the "public interfaces", there are many `close` method listed, but
> I
> > believe you only want to introduce the one with `CloseOptions`, right?
> > Could you add some comment to it to point it out this is what you added
> in
> > this KIP?
> > 2. In the CloseOptions class variable, I saw there's a `timeoutMs` as
> type
> > `Integer`. Could you explain why we use `Integer` here, instead of `int`,
> > or `Duration`?
> >
> > Thank you.
> > Luke
> >
> > On Tue, Feb 15, 2022 at 11:19 PM Seung-chan Ahn  >
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > >
> > > Thanks a lot for sharing your idea and guiding me.
> > >
> > >
> > > Thanks to your support, we've reached this voting:
> > >
> > > https://lists.apache.org/thread/toq5pg799ctd7lwdcd6g7zk6xn73h26r
> > >
> > >
> > > Seung-chan
> > >
> > > On Wed, Jan 12, 2022 at 10:50 PM Seung-chan Ahn <
> dev.issea1...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi team,
> > > >
> > > > Here's the new KIP
> > > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group
> > > >
> > > >  for this issue .
> > > >
> > > > The title says pretty much what this KIP is for. Even though it's my
> > > first
> > > > draft, as A. Sophie Blee-Goldman has written rich descriptions and
> > > already
> > > > the solutions in the issue thread, I've enjoyed following up on the
> > idea.
> > > >
> > > > Please feel free to review on any point!
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-18 Thread Seung-chan Ahn
Hi Luke,

Thanks for reviewing this KIP!

1. You are right, I'm only adding one method with `CloseOptions`. Let me
comment in the KIP! -> Done
2. I think it makes more sense to have `Duration` type option which is
aligned to the existing public method. Let me update the KIP! -> Done

Seung-chan

On Fri, Feb 18, 2022 at 3:21 PM Luke Chen  wrote:

> Hi Seung-chan,
>
> Thanks for the KIP!
> I like the optimization to have a close method to force leave group.
> Some minor comments:
> 1. in the "public interfaces", there are many `close` method listed, but I
> believe you only want to introduce the one with `CloseOptions`, right?
> Could you add some comment to it to point it out this is what you added in
> this KIP?
> 2. In the CloseOptions class variable, I saw there's a `timeoutMs` as type
> `Integer`. Could you explain why we use `Integer` here, instead of `int`,
> or `Duration`?
>
> Thank you.
> Luke
>
> On Tue, Feb 15, 2022 at 11:19 PM Seung-chan Ahn 
> wrote:
>
> > Hi Guozhang,
> >
> >
> > Thanks a lot for sharing your idea and guiding me.
> >
> >
> > Thanks to your support, we've reached this voting:
> >
> > https://lists.apache.org/thread/toq5pg799ctd7lwdcd6g7zk6xn73h26r
> >
> >
> > Seung-chan
> >
> > On Wed, Jan 12, 2022 at 10:50 PM Seung-chan Ahn  >
> > wrote:
> >
> > > Hi team,
> > >
> > > Here's the new KIP
> > > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group
> > >
> > >  for this issue .
> > >
> > > The title says pretty much what this KIP is for. Even though it's my
> > first
> > > draft, as A. Sophie Blee-Goldman has written rich descriptions and
> > already
> > > the solutions in the issue thread, I've enjoyed following up on the
> idea.
> > >
> > > Please feel free to review on any point!
> > >
> >
>


Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-17 Thread Luke Chen
Hi Seung-chan,

Thanks for the KIP!
I like the optimization to have a close method to force leave group.
Some minor comments:
1. in the "public interfaces", there are many `close` method listed, but I
believe you only want to introduce the one with `CloseOptions`, right?
Could you add some comment to it to point it out this is what you added in
this KIP?
2. In the CloseOptions class variable, I saw there's a `timeoutMs` as type
`Integer`. Could you explain why we use `Integer` here, instead of `int`,
or `Duration`?

Thank you.
Luke

On Tue, Feb 15, 2022 at 11:19 PM Seung-chan Ahn 
wrote:

> Hi Guozhang,
>
>
> Thanks a lot for sharing your idea and guiding me.
>
>
> Thanks to your support, we've reached this voting:
>
> https://lists.apache.org/thread/toq5pg799ctd7lwdcd6g7zk6xn73h26r
>
>
> Seung-chan
>
> On Wed, Jan 12, 2022 at 10:50 PM Seung-chan Ahn 
> wrote:
>
> > Hi team,
> >
> > Here's the new KIP
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group
> >
> >  for this issue .
> >
> > The title says pretty much what this KIP is for. Even though it's my
> first
> > draft, as A. Sophie Blee-Goldman has written rich descriptions and
> already
> > the solutions in the issue thread, I've enjoyed following up on the idea.
> >
> > Please feel free to review on any point!
> >
>


Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-15 Thread Seung-chan Ahn
Hi Guozhang,


Thanks a lot for sharing your idea and guiding me.


Thanks to your support, we've reached this voting:

https://lists.apache.org/thread/toq5pg799ctd7lwdcd6g7zk6xn73h26r


Seung-chan

On Wed, Jan 12, 2022 at 10:50 PM Seung-chan Ahn 
wrote:

> Hi team,
>
> Here's the new KIP
> 
>  for this issue .
>
> The title says pretty much what this KIP is for. Even though it's my first
> draft, as A. Sophie Blee-Goldman has written rich descriptions and already
> the solutions in the issue thread, I've enjoyed following up on the idea.
>
> Please feel free to review on any point!
>


Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-13 Thread Guozhang Wang
Thanks Seung-chan!

Re: "As I read the code, we can set a consumer to leave the group while
shutting
down the thread, using `StreamThread#requestLeaveGroupDuringShutdown`
method. Is it enough to call that method on every thread created in
`KafkaStream` sometime before we call `StreamThread#shutdown`?"

I think you are right. We can bypass API changes in consumers for this KIP.

Re: "I updated the `Public Interfaces` section in the KIP to specify full
signature. As you can see, I suggested putting the `CloseOptions` class in
the `KafkaStream` class. I feel like it'd be too general name to put it in
a separate file."

Re-read the updated KIP, looks good to me!


Guozhang

On Fri, Feb 11, 2022 at 11:21 PM Seung-chan Ahn 
wrote:

> (re-sending with the better syntax of quotation)
>
> Hello Guozhang,
>
> Thanks for your rich comment! I'd carefully read through all you mentioned.
> I updated the `Public Interfaces` section of KIP and this is what I think:
>
> @Guozhang: Today we use the timeout to try to tackle all three cases, but
> > ideally we want the client to submit extra information to help
> distinguish
> > them. I.e. we just use timeout for case 1) only, while we use separate
> > mechanisms to differentiate 2) and 3) from it. Personally I think we
> could
> > consider having an augmented leave-group (or maybe in the long run, we
> can
> > merge that RPC as part of heartbeat) with a flag indicating 2) or 3),
> while
> > just relying on the timeout for case 1).
>
>
> I know you proposed this idea in a wider scope than this KIP, but it'd be
> worth keeping the discussion. I've thought about the idea of `augmented
> leave-group with a flag indicating 2) or 3)`. In the case that a bouncing
> consumer requested, with a flag, to leave the group, and unfortunately, it
> failed to restart, I guess the group’s coordinator still needs to drop the
> consumer after some while. And by its nature, the coordinator would wait
> for the consumer till the timeout reached. Eventually, it seems like not
> really different from the case the consumer restarts and prays the timeout
> is enough. In my very naive thought, `augmented leave-group with a flag
> indicating 2)` is not supposed to be a request to leave the group but the
> one for being exempted from the timeout. So I’d rather consider having a
> request to extend the timeout for one time instead.
>
> @Guozhang: 1. Regarding the API change, I feel just doing that on the
> > streams side is not enough since by the end of the day we still need the
> > consumer to incorporate it (today it's via a static config and hence we
> > cannot just dynamically change the config).
>
>
> As I read the code, we can set a consumer to leave the group while shutting
> down the thread, using `StreamThread#requestLeaveGroupDuringShutdown`
> method. Is it enough to call that method on every thread created in
> `KafkaStream` sometime before we call `StreamThread#shutdown`?
>
> @Guozhang: 2. Regarding the API itself, I want to see a more concrete
> > proposal that contains the full signature, e.g.
> does".closeAndLeaveGroup()"
> > include the timeout param as well, etc? My very subjective preference is
> to
> > not differentiate by the function name in case we may want to augment the
> > close function in the future, which would explode the function names :P
> > Instead maybe we can just overload the `close()` function again but with
> a
> > control object, that includes 1) timeout, 2) leave-group flag, and hence
> > can also extend to include more variables just in case. Would like to
> hear
> > others' thoughts as well.
>
>
> I personally prefer the control object pattern too. It will save us from
> the "telescoping constructors" pattern. Also, I found that we already
> introduced this way on `AdminClient`. It sounds consistent to have the same
> pattern in this case.
>
> I updated the `Public Interfaces` section in the KIP to specify full
> signature. As you can see, I suggested putting the `CloseOptions` class in
> the `KafkaStream` class. I feel like it'd be too general name to put it in
> a separate file.
>
> I’m fully open :) Feel free to oppose any.
>
> On Mon, Feb 7, 2022 at 12:50 PM Guozhang Wang  wrote:
>
> > Hello Seung-chan,
> >
> > Thanks for the KIP writeup and summary! I made a pass on it and want to
> > share some of my thoughts:
> >
> > On the very high level, we want to be able to effectively differentiate
> > several cases as follows:
> >
> > 1) There's a network partition / soft failure hence clients cannot reach
> > the broker, temporarily: here we want to give some time to see if the
> > clients can reconnect back, and hence the timeout makes sense.
> > 2) The client is being bounced, i.e. a shutdown followed by a restart:
> here
> > we do not want to trigger any rebalance, but today we can only hope that
> > the timeout is long enough to cover that bounce window.
> > 3) The client is shutdown and won't be back: here we want to trigger the
> > rebalance immediately, 

Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-11 Thread Seung-chan Ahn
(re-sending with the better syntax of quotation)

Hello Guozhang,

Thanks for your rich comment! I'd carefully read through all you mentioned.
I updated the `Public Interfaces` section of KIP and this is what I think:

@Guozhang: Today we use the timeout to try to tackle all three cases, but
> ideally we want the client to submit extra information to help distinguish
> them. I.e. we just use timeout for case 1) only, while we use separate
> mechanisms to differentiate 2) and 3) from it. Personally I think we could
> consider having an augmented leave-group (or maybe in the long run, we can
> merge that RPC as part of heartbeat) with a flag indicating 2) or 3), while
> just relying on the timeout for case 1).


I know you proposed this idea in a wider scope than this KIP, but it'd be
worth keeping the discussion. I've thought about the idea of `augmented
leave-group with a flag indicating 2) or 3)`. In the case that a bouncing
consumer requested, with a flag, to leave the group, and unfortunately, it
failed to restart, I guess the group’s coordinator still needs to drop the
consumer after some while. And by its nature, the coordinator would wait
for the consumer till the timeout reached. Eventually, it seems like not
really different from the case the consumer restarts and prays the timeout
is enough. In my very naive thought, `augmented leave-group with a flag
indicating 2)` is not supposed to be a request to leave the group but the
one for being exempted from the timeout. So I’d rather consider having a
request to extend the timeout for one time instead.

@Guozhang: 1. Regarding the API change, I feel just doing that on the
> streams side is not enough since by the end of the day we still need the
> consumer to incorporate it (today it's via a static config and hence we
> cannot just dynamically change the config).


As I read the code, we can set a consumer to leave the group while shutting
down the thread, using `StreamThread#requestLeaveGroupDuringShutdown`
method. Is it enough to call that method on every thread created in
`KafkaStream` sometime before we call `StreamThread#shutdown`?

@Guozhang: 2. Regarding the API itself, I want to see a more concrete
> proposal that contains the full signature, e.g. does".closeAndLeaveGroup()"
> include the timeout param as well, etc? My very subjective preference is to
> not differentiate by the function name in case we may want to augment the
> close function in the future, which would explode the function names :P
> Instead maybe we can just overload the `close()` function again but with a
> control object, that includes 1) timeout, 2) leave-group flag, and hence
> can also extend to include more variables just in case. Would like to hear
> others' thoughts as well.


I personally prefer the control object pattern too. It will save us from
the "telescoping constructors" pattern. Also, I found that we already
introduced this way on `AdminClient`. It sounds consistent to have the same
pattern in this case.

I updated the `Public Interfaces` section in the KIP to specify full
signature. As you can see, I suggested putting the `CloseOptions` class in
the `KafkaStream` class. I feel like it'd be too general name to put it in
a separate file.

I’m fully open :) Feel free to oppose any.

On Mon, Feb 7, 2022 at 12:50 PM Guozhang Wang  wrote:

> Hello Seung-chan,
>
> Thanks for the KIP writeup and summary! I made a pass on it and want to
> share some of my thoughts:
>
> On the very high level, we want to be able to effectively differentiate
> several cases as follows:
>
> 1) There's a network partition / soft failure hence clients cannot reach
> the broker, temporarily: here we want to give some time to see if the
> clients can reconnect back, and hence the timeout makes sense.
> 2) The client is being bounced, i.e. a shutdown followed by a restart: here
> we do not want to trigger any rebalance, but today we can only hope that
> the timeout is long enough to cover that bounce window.
> 3) The client is shutdown and won't be back: here we want to trigger the
> rebalance immediately, but today we'd have to wait for the timeout value.
>
> Today we use the timeout to try to tackle all three cases, but ideally we
> want the client to submit extra information to help distinguish them. I.e.
> we just use timeout for case 1) only, while we use separate mechanisms to
> differentiate 2) and 3) from it. Personally I think we could consider
> having an augmented leave-group (or maybe in the long run, we can merge
> that RPC as part of heartbeat) with a flag indicating 2) or 3), while just
> relying on the timeout for case 1).
>
> But to consider a narrower scope for this KIP that does not touch on
> protocol changes, I think just differentiate 2/3) by not sending
> leave-group for 2) v.s. sending leave-group for 3) is sufficient.
>
> As for the KIP itself, I have a few minor comments:
>
> 1. Regarding the API change, I feel just doing that on the streams side is
> not enough 

Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-11 Thread Seung-chan Ahn
Hello Guozhang,

Thanks for your rich comment! I'd carefully read through all you mentioned.
I updated the `Public Interfaces` section of KIP and this is what I think:

> @Guozhang: Today we use the timeout to try to tackle all three cases, but
ideally we want the client to submit extra information to help distinguish
them. I.e. we just use timeout for case 1) only, while we use separate
mechanisms to differentiate 2) and 3) from it. Personally I think we could
consider having an augmented leave-group (or maybe in the long run, we can
merge that RPC as part of heartbeat) with a flag indicating 2) or 3), while
just relying on the timeout for case 1).

I know you proposed this idea in a wider scope than this KIP, but it'd be
worth keeping the discussion. I've thought about the idea of `augmented
leave-group with a flag indicating 2) or 3)`. In the case that a bouncing
consumer requested, with a flag, to leave the group, and unfortunately, it
failed to restart, I guess the group’s coordinator still needs to drop the
consumer after some while. And by its nature, the coordinator would wait
for the consumer till the timeout reached. Eventually, it seems like not
really different from the case the consumer restarts and prays the timeout
is enough. In my very naive thought, `augmented leave-group with a flag
indicating 2)` is not supposed to be a request to leave the group but the
one for being exempted from the timeout. So I’d rather consider having a
request to extend the timeout for one time instead.

> @Guozhang: 1. Regarding the API change, I feel just doing that on the
streams side is not enough since by the end of the day we still need the
consumer to incorporate it (today it's via a static config and hence we
cannot just dynamically change the config).

As I read the code, we can set a consumer to leave the group while shutting
down the thread, using `StreamThread#requestLeaveGroupDuringShutdown`
method. Is it enough to call that method on every thread created in
`KafkaStream` sometime before we call `StreamThread#shutdown`?

> @Guozhang: 2. Regarding the API itself, I want to see a more concrete
proposal that contains the full signature, e.g. does".closeAndLeaveGroup()"
include the timeout param as well, etc? My very subjective preference is to
not differentiate by the function name in case we may want to augment the
close function in the future, which would explode the function names :P
Instead maybe we can just overload the `close()` function again but with a
control object, that includes 1) timeout, 2) leave-group flag, and hence
can also extend to include more variables just in case. Would like to hear
others' thoughts as well.

I personally prefer the control object pattern too. It will save us from
the "telescoping constructors" pattern. Also, I found that we already
introduced this way on `AdminClient`. It sounds consistent to have the same
pattern in this case.

I updated the `Public Interfaces` section in the KIP to specify full
signature. As you can see, I suggested putting the `CloseOptions` class in
the `KafkaStream` class. I feel like it'd be too general name to put it in
a separate file.

I’m fully open :) Feel free to oppose any.

On Mon, Feb 7, 2022 at 12:50 PM Guozhang Wang  wrote:

> Hello Seung-chan,
>
> Thanks for the KIP writeup and summary! I made a pass on it and want to
> share some of my thoughts:
>
> On the very high level, we want to be able to effectively differentiate
> several cases as follows:
>
> 1) There's a network partition / soft failure hence clients cannot reach
> the broker, temporarily: here we want to give some time to see if the
> clients can reconnect back, and hence the timeout makes sense.
> 2) The client is being bounced, i.e. a shutdown followed by a restart: here
> we do not want to trigger any rebalance, but today we can only hope that
> the timeout is long enough to cover that bounce window.
> 3) The client is shutdown and won't be back: here we want to trigger the
> rebalance immediately, but today we'd have to wait for the timeout value.
>
> Today we use the timeout to try to tackle all three cases, but ideally we
> want the client to submit extra information to help distinguish them. I.e.
> we just use timeout for case 1) only, while we use separate mechanisms to
> differentiate 2) and 3) from it. Personally I think we could consider
> having an augmented leave-group (or maybe in the long run, we can merge
> that RPC as part of heartbeat) with a flag indicating 2) or 3), while just
> relying on the timeout for case 1).
>
> But to consider a narrower scope for this KIP that does not touch on
> protocol changes, I think just differentiate 2/3) by not sending
> leave-group for 2) v.s. sending leave-group for 3) is sufficient.
>
> As for the KIP itself, I have a few minor comments:
>
> 1. Regarding the API change, I feel just doing that on the streams side is
> not enough since by the end of the day we still need the consumer to
> incorporate it (today 

Re: [DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-02-06 Thread Guozhang Wang
Hello Seung-chan,

Thanks for the KIP writeup and summary! I made a pass on it and want to
share some of my thoughts:

On the very high level, we want to be able to effectively differentiate
several cases as follows:

1) There's a network partition / soft failure hence clients cannot reach
the broker, temporarily: here we want to give some time to see if the
clients can reconnect back, and hence the timeout makes sense.
2) The client is being bounced, i.e. a shutdown followed by a restart: here
we do not want to trigger any rebalance, but today we can only hope that
the timeout is long enough to cover that bounce window.
3) The client is shutdown and won't be back: here we want to trigger the
rebalance immediately, but today we'd have to wait for the timeout value.

Today we use the timeout to try to tackle all three cases, but ideally we
want the client to submit extra information to help distinguish them. I.e.
we just use timeout for case 1) only, while we use separate mechanisms to
differentiate 2) and 3) from it. Personally I think we could consider
having an augmented leave-group (or maybe in the long run, we can merge
that RPC as part of heartbeat) with a flag indicating 2) or 3), while just
relying on the timeout for case 1).

But to consider a narrower scope for this KIP that does not touch on
protocol changes, I think just differentiate 2/3) by not sending
leave-group for 2) v.s. sending leave-group for 3) is sufficient.

As for the KIP itself, I have a few minor comments:

1. Regarding the API change, I feel just doing that on the streams side is
not enough since by the end of the day we still need the consumer to
incorporate it (today it's via a static config and hence we cannot just
dynamically change the config).

2. Regarding the API itself, I want to see a more concrete proposal that
contains the full signature, e.g. does".closeAndLeaveGroup()" include the
timeout param as well, etc? My very subjective preference is to not
differentiate by the function name in case we may want to augment the close
function in the future, which would explode the function names :P Instead
maybe we can just overload the `close()` function again but with a control
object, that includes 1) timeout, 2) leave-group flag, and hence can also
extend to include more variables just in case. Would like to hear others'
thoughts as well.


Guozhang


On Wed, Jan 12, 2022 at 5:51 AM Seung-chan Ahn 
wrote:

> Hi team,
>
> Here's the new KIP
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group
> >
>  for this issue .
>
> The title says pretty much what this KIP is for. Even though it's my first
> draft, as A. Sophie Blee-Goldman has written rich descriptions and already
> the solutions in the issue thread, I've enjoyed following up on the idea.
>
> Please feel free to review on any point!
>


-- 
-- Guozhang


[DISCUSS] KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group

2022-01-12 Thread Seung-chan Ahn
Hi team,

Here's the new KIP

 for this issue .

The title says pretty much what this KIP is for. Even though it's my first
draft, as A. Sophie Blee-Goldman has written rich descriptions and already
the solutions in the issue thread, I've enjoyed following up on the idea.

Please feel free to review on any point!