Hi Guozhang, Thank you for the reviews, and I agree with your suggestions: If we find any behavior changes, we will issue a KIP. Is it possible to first publish the new implementation in a separate class so that we don't disrupt the existing implementation? Then we can issue a KIP to address the regressions.
To your questions: 1. I think the coordinator discovery will happen on demand, i.e., the background thread will only try to discover a coordinator when the event (e.g., commit) requires one. I've noted that, and I'll make sure that in the proposal. 2. Acked. I think I'll add a section about how each operation works in the new implementation. 3a. I will modify the existing state enum in the new implementation. 3b. Ah, I must have missed this when proofreading the document. I think the state should be: Unjoined -> Commit_async -> Revoking_Partitions -> Partitions_Revoked -> Join_Group -> Completed_Rebalancing -> Assinging_Partitions -> Partitions_Assigned -> Stable I've made a note of that, and I'll update the document. 4. Noted. I'll add a section about exception handling. 5a. Kirk also had the same comment. I updated the document. 5b. yes! Regarding the POC comments, I think the POC branch is actually quite dated. I need to update it 😅. Do you think we can start with simple PRs like skeletons of ConsumerBackgroundThread.java and EventHandler interface and implementations? 1. I agree the naming can be confusing, but I think the usage of response and request has implications for the type of event. As you said, BT -> PT handles error and callback events, and PT -> BT handles mostly API calls. I'll think again about these naming. 2. Great suggestions. 3. I think I'll start with defining a few basic events, like assign, commits, and fetch. Then we can modify that as we progress with the project. Thanks, P On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Philip, > > Thanks for writing down the 1-pager. Just to clarify, the reason we wrote > this as a 1-pager instead of a KIP is that so far all the implementations > should be internal, and hence not impacting anything on the public APIs. > If, e.g. we found along with the implementation that some public interfaces > like metrics, need to be modified, then we will send out a separate thread > for that in a KIP proposal, right? > > I made a pass on the 1-pager and also some of the ongoing developments, > here are just some high-level comments: > > On the 1-pager: > > 1) I think it's better to clarify the scenarios under manual assignment > a.k.a. `consumer.assign()`. E.g. does the background thread still always > tries to discover coordinator even if there's no commit requests (note > that, if yes, this would be a behavioral change since some users like > Connect may rely on the consumer to not ever try to discover the > coordinator with manual assignment and no commit to brokers)? From the > description it seems if there's no events from the channel to request > committing offsets, it would not try to discover coordinator, but then the > background thread's state would would be in `down` and `initialized`, not > in `stable`, and hence we should also allow transition from `initialized` > to `down` directly, right? > > 2) From the polling thread, besides the `poll` function changes, I think > it's better to also state other blocking function changes like commitSync > as well. I'd assume e.g. for commitSync it would be implemented as: > > * Send the commit-request to the server-event channel > * Continue polling from the consumer event channel, but skip other events > like rebalance-listener (we still need to bookkeep it for the next `poll` > call, but we just cannot trigger them since that breaks compatibility) > until we received the commit response event. > > Some details about how those functions would be implemented would also be > very helpful for the community's audience. > > 3) I have a few questions from the rebalance state machine section: > > 3.a). you mentioned in the state machine: > > "Wait for the partition revoked event, and advance the state to > PARTITIONS_REVOKED" > "Wait for partition assignment completion from the polling thread. Advance > to PARTITIONS_ASSIGNED" > > But we do not have those states today, I think you meant to say > "PREPARE_REBALANCING" and "STABLE"? > > 3.b). Also, it seems that the proposed state transition would be Stable -> > Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing -> > Assigning_Partitions -> Stable (for eager protocol at least), but when auto > commit is enabled, we also need to commit offsets for those revoking > partitions, and the auto-committing happens before the > `onPartitionsRevoked` listener is triggered, so should auto-committing be > part of the `Revoking_Partitions` state as well? > > 4) Could you expand on the "Exceptions thrown will be different." > description and list all changes to the exceptions (note that, if they do > exist, we also need a KIP). For example, besides WakeupException, Kafka > also have a InterruptException (different from Java's own > InterruptedException) defined on the public APIs, are we going to change > which functions would throw and which will not? > > 5) In the testing plan: > > 5.a) Could you elaborate a bit more on "We need to make sure the timing of > the 1. coordinator discovery and 2. joinGroup operations are being done > in the correct timing." > > 5.b) I'd also add that for all the blocking APIs including `poll` where a > timeout value is provided either as param, or implicitly from ` > default.api.timeout.ms` they would now be strictly respected --- to me > this > is also one of the key motivations of this refactoring :) > > ------------------------- > > And the current POC PRs: > > 1. Just a quick thought about the naming of "KafkaServerEventQueue": I > think I also agree with others that it may be confusing to include > `KafkaServer` on a client class, what about renaming it to > "ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? I know that > it sounds a bit awkward to have the `ResponseEventQueue` to also return > rebalance-listener-triggering events, but that may be less confusing. > > 2. I'd suggest for new modules like `ConsumerBackgroundThread`, we first > defines an interface in the `internals` package, e.g. `RequestEventHandler` > (assuming the previous rename suggestion), and then have a > `DefaultRequestEventHandler` implementation class which encapsulate the > background thread. This enables us to easily write unit tests that isolate > other modules especially with concurrent threadings. > > 3. For `KafkaServerEventType`: where would NOOP being used? Also I think > there are other types, like FETCH_COMMITTED as well? > > > > Thanks, > Guozhang > > > On Tue, Sep 13, 2022 at 2:14 PM Philip Nee <philip...@gmail.com> wrote: > > > Hi all, > > > > Here is the proposal to refactor the Kafka Consumer > > < > > > https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor > > >. > > The 1-pager is on the wiki, so please take a look at it. Also, this is a > > prerequisite for KIP-848 (the next gen rebalance protocol). > > > > Cheers, > > P > > > > > -- > -- Guozhang >