Hi Guozhang,

Thank you for the reviews, and I agree with your suggestions: If we find
any behavior changes, we will issue a KIP. Is it possible to first publish
the new implementation in a separate class so that we don't disrupt the
existing implementation? Then we can issue a KIP to address the regressions.

To your questions:
1. I think the coordinator discovery will happen on demand, i.e., the
background thread will only try to discover a coordinator when the event
(e.g., commit) requires one. I've noted that, and I'll make sure that in
the proposal.
2. Acked. I think I'll add a section about how each operation works in the
new implementation.
3a. I will modify the existing state enum in the new implementation.
3b. Ah, I must have missed this when proofreading the document. I think the
state should be:
  Unjoined -> Commit_async -> Revoking_Partitions -> Partitions_Revoked ->
Join_Group -> Completed_Rebalancing -> Assinging_Partitions ->
Partitions_Assigned -> Stable
  I've made a note of that, and I'll update the document.
4. Noted. I'll add a section about exception handling.
5a.  Kirk also had the same comment. I updated the document.
5b. yes!

Regarding the POC comments, I think the POC branch is actually quite dated.
I need to update it 😅. Do you think we can start with simple PRs like
skeletons of ConsumerBackgroundThread.java and EventHandler interface and
implementations?

1. I agree the naming can be confusing, but I think the usage of response
and request has implications for the type of event. As you said, BT -> PT
handles error and callback events, and PT -> BT handles mostly API calls.
I'll think again about these naming.
2. Great suggestions.
3. I think I'll start with defining a few basic events, like assign,
commits, and fetch. Then we can modify that as we progress with the project.

Thanks,
P


On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Philip,
>
> Thanks for writing down the 1-pager. Just to clarify, the reason we wrote
> this as a 1-pager instead of a KIP is that so far all the implementations
> should be internal, and hence not impacting anything on the public APIs.
> If, e.g. we found along with the implementation that some public interfaces
> like metrics, need to be modified, then we will send out a separate thread
> for that in a KIP proposal, right?
>
> I made a pass on the 1-pager and also some of the ongoing developments,
> here are just some high-level comments:
>
> On the 1-pager:
>
> 1) I think it's better to clarify the scenarios under manual assignment
> a.k.a. `consumer.assign()`. E.g. does the background thread still always
> tries to discover coordinator even if there's no commit requests (note
> that, if yes, this would be a behavioral change since some users like
> Connect may rely on the consumer to not ever try to discover the
> coordinator with manual assignment and no commit to brokers)? From the
> description it seems if there's no events from the channel to request
> committing offsets, it would not try to discover coordinator, but then the
> background thread's state would would be in `down` and `initialized`, not
> in `stable`, and hence we should also allow transition from `initialized`
> to `down` directly, right?
>
> 2) From the polling thread, besides the `poll` function changes, I think
> it's better to also state other blocking function changes like commitSync
> as well. I'd assume e.g. for commitSync it would be implemented as:
>
> * Send the commit-request to the server-event channel
> * Continue polling from the consumer event channel, but skip other events
> like rebalance-listener (we still need to bookkeep it for the next `poll`
> call, but we just cannot trigger them since that breaks compatibility)
> until we received the commit response event.
>
> Some details about how those functions would be implemented would also be
> very helpful for the community's audience.
>
> 3) I have a few questions from the rebalance state machine section:
>
> 3.a). you mentioned in the state machine:
>
> "Wait for the partition revoked event, and advance the state to
> PARTITIONS_REVOKED"
> "Wait for partition assignment completion from the polling thread.  Advance
> to PARTITIONS_ASSIGNED"
>
> But we do not have those states today, I think you meant to say
> "PREPARE_REBALANCING" and "STABLE"?
>
> 3.b). Also, it seems that the proposed state transition would be Stable ->
> Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing ->
> Assigning_Partitions -> Stable (for eager protocol at least), but when auto
> commit is enabled, we also need to commit offsets for those revoking
> partitions, and the auto-committing happens before the
> `onPartitionsRevoked` listener is triggered, so should auto-committing be
> part of the `Revoking_Partitions` state as well?
>
> 4) Could you expand on the "Exceptions thrown will be different."
> description and list all changes to the exceptions (note that, if they do
> exist, we also need a KIP). For example, besides WakeupException, Kafka
> also have a InterruptException (different from Java's own
> InterruptedException) defined on the public APIs, are we going to change
> which functions would throw and which will not?
>
> 5) In the testing plan:
>
> 5.a) Could you elaborate a bit more on "We need to make sure the timing of
> the 1.  coordinator discovery and 2.  joinGroup operations are being done
> in the correct timing."
>
> 5.b) I'd also add that for all the blocking APIs including `poll` where a
> timeout value is provided either as param, or implicitly from `
> default.api.timeout.ms` they would now be strictly respected --- to me
> this
> is also one of the key motivations of this refactoring :)
>
> -------------------------
>
> And the current POC PRs:
>
> 1. Just a quick thought about the naming of "KafkaServerEventQueue": I
> think I also agree with others that it may be confusing to include
> `KafkaServer` on a client class, what about renaming it to
> "ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? I know that
> it sounds a bit awkward to have the `ResponseEventQueue` to also return
> rebalance-listener-triggering events, but that may be less confusing.
>
> 2. I'd suggest for new modules like `ConsumerBackgroundThread`, we first
> defines an interface in the `internals` package, e.g. `RequestEventHandler`
> (assuming the previous rename suggestion), and then have a
> `DefaultRequestEventHandler` implementation class which encapsulate the
> background thread. This enables us to easily write unit tests that isolate
> other modules especially with concurrent threadings.
>
> 3. For `KafkaServerEventType`: where would NOOP being used? Also I think
> there are other types, like FETCH_COMMITTED as well?
>
>
>
> Thanks,
> Guozhang
>
>
> On Tue, Sep 13, 2022 at 2:14 PM Philip Nee <philip...@gmail.com> wrote:
>
> > Hi all,
> >
> > Here is the proposal to refactor the Kafka Consumer
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
> > >.
> > The 1-pager is on the wiki, so please take a look at it.  Also, this is a
> > prerequisite for KIP-848 (the next gen rebalance protocol).
> >
> > Cheers,
> > P
> >
>
>
> --
> -- Guozhang
>

Reply via email to