On Fri, Sep 16, 2022 at 3:01 PM Guozhang Wang <wangg...@gmail.com> wrote:
> Hi Philip, > > On Fri, Sep 16, 2022 at 1:20 PM Philip Nee <philip...@gmail.com> wrote: > > > Hi Guozhang, > > > > Thank you for the reviews, and I agree with your suggestions: If we find > > any behavior changes, we will issue a KIP. Is it possible to first > publish > > the new implementation in a separate class so that we don't disrupt the > > existing implementation? Then we can issue a KIP to address the > > regressions. > > > > Yes! That's what I meant to clarify. Just in case other people may be > confused why we did not propose a KIP right away, but first as a one-pager. > > > > To your questions: > > 1. I think the coordinator discovery will happen on demand, i.e., the > > background thread will only try to discover a coordinator when the event > > (e.g., commit) requires one. I've noted that, and I'll make sure that in > > the proposal. > > > Thanks. Could you then clarify that for the coordinator state transition? > Is that just between `down` and `initialized`, and then `initialized` can > then transition back to `down` too? > Will do, I'll update the 1 pager to clarify it. > > > > 2. Acked. I think I'll add a section about how each operation works in > the > > new implementation. > > 3a. I will modify the existing state enum in the new implementation. > > 3b. Ah, I must have missed this when proofreading the document. I think > the > > state should be: > > Unjoined -> Commit_async -> Revoking_Partitions -> Partitions_Revoked > -> > > Join_Group -> Completed_Rebalancing -> Assinging_Partitions -> > > Partitions_Assigned -> Stable > > I've made a note of that, and I'll update the document. > > > Got it, if we are introducing a third state for auto committing, then upon > completing the rebalance, we may also need to transit to that state since > we may also revoke partitions, right? This is for fixing KAFKA-14224 > - Instead of introducing a new state, I think I want to break up the PREPARING_REBALANCE into Commit and Revoke. So I think for your suggestion in 14224, I potentially we could do it this way. 1. REVOKING_PARTITION: mute the partition and send the callback event to the polling thread 2. PARTITION_REVOKED: Received the revocation ack. Advance the state to COMMIT. 3. COMMIT: Commit the offsets. Wait for the commit to finish, then we can start the join group. Again, thanks, I'll update these changes in the 1pager. > > > > 4. Noted. I'll add a section about exception handling. > > 5a. Kirk also had the same comment. I updated the document. > > 5b. yes! > > > > Regarding the POC comments, I think the POC branch is actually quite > dated. > > I need to update it 😅. Do you think we can start with simple PRs like > > skeletons of ConsumerBackgroundThread.java and EventHandler interface and > > implementations? > > > > Yes, I think as long as we are introducing new classes / interfaces the > internal package, we can incrementally merge PRs even if they are not > integrated with the public KafkaConsumer class yet. > > > > 1. I agree the naming can be confusing, but I think the usage of response > > and request has implications for the type of event. As you said, BT -> PT > > handles error and callback events, and PT -> BT handles mostly API calls. > > I'll think again about these naming. > > 2. Great suggestions. > > 3. I think I'll start with defining a few basic events, like assign, > > commits, and fetch. Then we can modify that as we progress with the > > project. > > > > Thanks, > > P > > > > > > On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Philip, > > > > > > Thanks for writing down the 1-pager. Just to clarify, the reason we > wrote > > > this as a 1-pager instead of a KIP is that so far all the > implementations > > > should be internal, and hence not impacting anything on the public > APIs. > > > If, e.g. we found along with the implementation that some public > > interfaces > > > like metrics, need to be modified, then we will send out a separate > > thread > > > for that in a KIP proposal, right? > > > > > > I made a pass on the 1-pager and also some of the ongoing developments, > > > here are just some high-level comments: > > > > > > On the 1-pager: > > > > > > 1) I think it's better to clarify the scenarios under manual assignment > > > a.k.a. `consumer.assign()`. E.g. does the background thread still > always > > > tries to discover coordinator even if there's no commit requests (note > > > that, if yes, this would be a behavioral change since some users like > > > Connect may rely on the consumer to not ever try to discover the > > > coordinator with manual assignment and no commit to brokers)? From the > > > description it seems if there's no events from the channel to request > > > committing offsets, it would not try to discover coordinator, but then > > the > > > background thread's state would would be in `down` and `initialized`, > not > > > in `stable`, and hence we should also allow transition from > `initialized` > > > to `down` directly, right? > > > > > > 2) From the polling thread, besides the `poll` function changes, I > think > > > it's better to also state other blocking function changes like > commitSync > > > as well. I'd assume e.g. for commitSync it would be implemented as: > > > > > > * Send the commit-request to the server-event channel > > > * Continue polling from the consumer event channel, but skip other > events > > > like rebalance-listener (we still need to bookkeep it for the next > `poll` > > > call, but we just cannot trigger them since that breaks compatibility) > > > until we received the commit response event. > > > > > > Some details about how those functions would be implemented would also > be > > > very helpful for the community's audience. > > > > > > 3) I have a few questions from the rebalance state machine section: > > > > > > 3.a). you mentioned in the state machine: > > > > > > "Wait for the partition revoked event, and advance the state to > > > PARTITIONS_REVOKED" > > > "Wait for partition assignment completion from the polling thread. > > Advance > > > to PARTITIONS_ASSIGNED" > > > > > > But we do not have those states today, I think you meant to say > > > "PREPARE_REBALANCING" and "STABLE"? > > > > > > 3.b). Also, it seems that the proposed state transition would be Stable > > -> > > > Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing -> > > > Assigning_Partitions -> Stable (for eager protocol at least), but when > > auto > > > commit is enabled, we also need to commit offsets for those revoking > > > partitions, and the auto-committing happens before the > > > `onPartitionsRevoked` listener is triggered, so should auto-committing > be > > > part of the `Revoking_Partitions` state as well? > > > > > > 4) Could you expand on the "Exceptions thrown will be different." > > > description and list all changes to the exceptions (note that, if they > do > > > exist, we also need a KIP). For example, besides WakeupException, Kafka > > > also have a InterruptException (different from Java's own > > > InterruptedException) defined on the public APIs, are we going to > change > > > which functions would throw and which will not? > > > > > > 5) In the testing plan: > > > > > > 5.a) Could you elaborate a bit more on "We need to make sure the timing > > of > > > the 1. coordinator discovery and 2. joinGroup operations are being > done > > > in the correct timing." > > > > > > 5.b) I'd also add that for all the blocking APIs including `poll` > where a > > > timeout value is provided either as param, or implicitly from ` > > > default.api.timeout.ms` they would now be strictly respected --- to me > > > this > > > is also one of the key motivations of this refactoring :) > > > > > > ------------------------- > > > > > > And the current POC PRs: > > > > > > 1. Just a quick thought about the naming of "KafkaServerEventQueue": I > > > think I also agree with others that it may be confusing to include > > > `KafkaServer` on a client class, what about renaming it to > > > "ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? I know > that > > > it sounds a bit awkward to have the `ResponseEventQueue` to also return > > > rebalance-listener-triggering events, but that may be less confusing. > > > > > > 2. I'd suggest for new modules like `ConsumerBackgroundThread`, we > first > > > defines an interface in the `internals` package, e.g. > > `RequestEventHandler` > > > (assuming the previous rename suggestion), and then have a > > > `DefaultRequestEventHandler` implementation class which encapsulate the > > > background thread. This enables us to easily write unit tests that > > isolate > > > other modules especially with concurrent threadings. > > > > > > 3. For `KafkaServerEventType`: where would NOOP being used? Also I > think > > > there are other types, like FETCH_COMMITTED as well? > > > > > > > > > > > > Thanks, > > > Guozhang > > > > > > > > > On Tue, Sep 13, 2022 at 2:14 PM Philip Nee <philip...@gmail.com> > wrote: > > > > > > > Hi all, > > > > > > > > Here is the proposal to refactor the Kafka Consumer > > > > < > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor > > > > >. > > > > The 1-pager is on the wiki, so please take a look at it. Also, this > > is a > > > > prerequisite for KIP-848 (the next gen rebalance protocol). > > > > > > > > Cheers, > > > > P > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >