Hello Philip, Thanks for writing down the 1-pager. Just to clarify, the reason we wrote this as a 1-pager instead of a KIP is that so far all the implementations should be internal, and hence not impacting anything on the public APIs. If, e.g. we found along with the implementation that some public interfaces like metrics, need to be modified, then we will send out a separate thread for that in a KIP proposal, right?
I made a pass on the 1-pager and also some of the ongoing developments, here are just some high-level comments: On the 1-pager: 1) I think it's better to clarify the scenarios under manual assignment a.k.a. `consumer.assign()`. E.g. does the background thread still always tries to discover coordinator even if there's no commit requests (note that, if yes, this would be a behavioral change since some users like Connect may rely on the consumer to not ever try to discover the coordinator with manual assignment and no commit to brokers)? From the description it seems if there's no events from the channel to request committing offsets, it would not try to discover coordinator, but then the background thread's state would would be in `down` and `initialized`, not in `stable`, and hence we should also allow transition from `initialized` to `down` directly, right? 2) From the polling thread, besides the `poll` function changes, I think it's better to also state other blocking function changes like commitSync as well. I'd assume e.g. for commitSync it would be implemented as: * Send the commit-request to the server-event channel * Continue polling from the consumer event channel, but skip other events like rebalance-listener (we still need to bookkeep it for the next `poll` call, but we just cannot trigger them since that breaks compatibility) until we received the commit response event. Some details about how those functions would be implemented would also be very helpful for the community's audience. 3) I have a few questions from the rebalance state machine section: 3.a). you mentioned in the state machine: "Wait for the partition revoked event, and advance the state to PARTITIONS_REVOKED" "Wait for partition assignment completion from the polling thread. Advance to PARTITIONS_ASSIGNED" But we do not have those states today, I think you meant to say "PREPARE_REBALANCING" and "STABLE"? 3.b). Also, it seems that the proposed state transition would be Stable -> Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing -> Assigning_Partitions -> Stable (for eager protocol at least), but when auto commit is enabled, we also need to commit offsets for those revoking partitions, and the auto-committing happens before the `onPartitionsRevoked` listener is triggered, so should auto-committing be part of the `Revoking_Partitions` state as well? 4) Could you expand on the "Exceptions thrown will be different." description and list all changes to the exceptions (note that, if they do exist, we also need a KIP). For example, besides WakeupException, Kafka also have a InterruptException (different from Java's own InterruptedException) defined on the public APIs, are we going to change which functions would throw and which will not? 5) In the testing plan: 5.a) Could you elaborate a bit more on "We need to make sure the timing of the 1. coordinator discovery and 2. joinGroup operations are being done in the correct timing." 5.b) I'd also add that for all the blocking APIs including `poll` where a timeout value is provided either as param, or implicitly from ` default.api.timeout.ms` they would now be strictly respected --- to me this is also one of the key motivations of this refactoring :) ------------------------- And the current POC PRs: 1. Just a quick thought about the naming of "KafkaServerEventQueue": I think I also agree with others that it may be confusing to include `KafkaServer` on a client class, what about renaming it to "ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? I know that it sounds a bit awkward to have the `ResponseEventQueue` to also return rebalance-listener-triggering events, but that may be less confusing. 2. I'd suggest for new modules like `ConsumerBackgroundThread`, we first defines an interface in the `internals` package, e.g. `RequestEventHandler` (assuming the previous rename suggestion), and then have a `DefaultRequestEventHandler` implementation class which encapsulate the background thread. This enables us to easily write unit tests that isolate other modules especially with concurrent threadings. 3. For `KafkaServerEventType`: where would NOOP being used? Also I think there are other types, like FETCH_COMMITTED as well? Thanks, Guozhang On Tue, Sep 13, 2022 at 2:14 PM Philip Nee <philip...@gmail.com> wrote: > Hi all, > > Here is the proposal to refactor the Kafka Consumer > < > https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor > >. > The 1-pager is on the wiki, so please take a look at it. Also, this is a > prerequisite for KIP-848 (the next gen rebalance protocol). > > Cheers, > P > -- -- Guozhang