Thanks Seung-chan! Re: "As I read the code, we can set a consumer to leave the group while shutting down the thread, using `StreamThread#requestLeaveGroupDuringShutdown` method. Is it enough to call that method on every thread created in `KafkaStream` sometime before we call `StreamThread#shutdown`?"
I think you are right. We can bypass API changes in consumers for this KIP. Re: "I updated the `Public Interfaces` section in the KIP to specify full signature. As you can see, I suggested putting the `CloseOptions` class in the `KafkaStream` class. I feel like it'd be too general name to put it in a separate file." Re-read the updated KIP, looks good to me! Guozhang On Fri, Feb 11, 2022 at 11:21 PM Seung-chan Ahn <[email protected]> wrote: > (re-sending with the better syntax of quotation) > > Hello Guozhang, > > Thanks for your rich comment! I'd carefully read through all you mentioned. > I updated the `Public Interfaces` section of KIP and this is what I think: > > @Guozhang: Today we use the timeout to try to tackle all three cases, but > > ideally we want the client to submit extra information to help > distinguish > > them. I.e. we just use timeout for case 1) only, while we use separate > > mechanisms to differentiate 2) and 3) from it. Personally I think we > could > > consider having an augmented leave-group (or maybe in the long run, we > can > > merge that RPC as part of heartbeat) with a flag indicating 2) or 3), > while > > just relying on the timeout for case 1). > > > I know you proposed this idea in a wider scope than this KIP, but it'd be > worth keeping the discussion. I've thought about the idea of `augmented > leave-group with a flag indicating 2) or 3)`. In the case that a bouncing > consumer requested, with a flag, to leave the group, and unfortunately, it > failed to restart, I guess the group’s coordinator still needs to drop the > consumer after some while. And by its nature, the coordinator would wait > for the consumer till the timeout reached. Eventually, it seems like not > really different from the case the consumer restarts and prays the timeout > is enough. In my very naive thought, `augmented leave-group with a flag > indicating 2)` is not supposed to be a request to leave the group but the > one for being exempted from the timeout. So I’d rather consider having a > request to extend the timeout for one time instead. > > @Guozhang: 1. Regarding the API change, I feel just doing that on the > > streams side is not enough since by the end of the day we still need the > > consumer to incorporate it (today it's via a static config and hence we > > cannot just dynamically change the config). > > > As I read the code, we can set a consumer to leave the group while shutting > down the thread, using `StreamThread#requestLeaveGroupDuringShutdown` > method. Is it enough to call that method on every thread created in > `KafkaStream` sometime before we call `StreamThread#shutdown`? > > @Guozhang: 2. Regarding the API itself, I want to see a more concrete > > proposal that contains the full signature, e.g. > does".closeAndLeaveGroup()" > > include the timeout param as well, etc? My very subjective preference is > to > > not differentiate by the function name in case we may want to augment the > > close function in the future, which would explode the function names :P > > Instead maybe we can just overload the `close()` function again but with > a > > control object, that includes 1) timeout, 2) leave-group flag, and hence > > can also extend to include more variables just in case. Would like to > hear > > others' thoughts as well. > > > I personally prefer the control object pattern too. It will save us from > the "telescoping constructors" pattern. Also, I found that we already > introduced this way on `AdminClient`. It sounds consistent to have the same > pattern in this case. > > I updated the `Public Interfaces` section in the KIP to specify full > signature. As you can see, I suggested putting the `CloseOptions` class in > the `KafkaStream` class. I feel like it'd be too general name to put it in > a separate file. > > I’m fully open :) Feel free to oppose any. > > On Mon, Feb 7, 2022 at 12:50 PM Guozhang Wang <[email protected]> wrote: > > > Hello Seung-chan, > > > > Thanks for the KIP writeup and summary! I made a pass on it and want to > > share some of my thoughts: > > > > On the very high level, we want to be able to effectively differentiate > > several cases as follows: > > > > 1) There's a network partition / soft failure hence clients cannot reach > > the broker, temporarily: here we want to give some time to see if the > > clients can reconnect back, and hence the timeout makes sense. > > 2) The client is being bounced, i.e. a shutdown followed by a restart: > here > > we do not want to trigger any rebalance, but today we can only hope that > > the timeout is long enough to cover that bounce window. > > 3) The client is shutdown and won't be back: here we want to trigger the > > rebalance immediately, but today we'd have to wait for the timeout value. > > > > Today we use the timeout to try to tackle all three cases, but ideally we > > want the client to submit extra information to help distinguish them. > I.e. > > we just use timeout for case 1) only, while we use separate mechanisms to > > differentiate 2) and 3) from it. Personally I think we could consider > > having an augmented leave-group (or maybe in the long run, we can merge > > that RPC as part of heartbeat) with a flag indicating 2) or 3), while > just > > relying on the timeout for case 1). > > > > But to consider a narrower scope for this KIP that does not touch on > > protocol changes, I think just differentiate 2/3) by not sending > > leave-group for 2) v.s. sending leave-group for 3) is sufficient. > > > > As for the KIP itself, I have a few minor comments: > > > > 1. Regarding the API change, I feel just doing that on the streams side > is > > not enough since by the end of the day we still need the consumer to > > incorporate it (today it's via a static config and hence we cannot just > > dynamically change the config). > > > > 2. Regarding the API itself, I want to see a more concrete proposal that > > contains the full signature, e.g. does".closeAndLeaveGroup()" include the > > timeout param as well, etc? My very subjective preference is to not > > differentiate by the function name in case we may want to augment the > close > > function in the future, which would explode the function names :P Instead > > maybe we can just overload the `close()` function again but with a > control > > object, that includes 1) timeout, 2) leave-group flag, and hence can also > > extend to include more variables just in case. Would like to hear others' > > thoughts as well. > > > > > > Guozhang > > > > > > On Wed, Jan 12, 2022 at 5:51 AM Seung-chan Ahn <[email protected]> > > wrote: > > > > > Hi team, > > > > > > Here's the new KIP > > > < > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group > > > > > > > for this issue <https://issues.apache.org/jira/browse/KAFKA-13217>. > > > > > > The title says pretty much what this KIP is for. Even though it's my > > first > > > draft, as A. Sophie Blee-Goldman has written rich descriptions and > > already > > > the solutions in the issue thread, I've enjoyed following up on the > idea. > > > > > > Please feel free to review on any point! > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang
