On Fri, Sep 16, 2022 at 3:01 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Philip,
>
> On Fri, Sep 16, 2022 at 1:20 PM Philip Nee <philip...@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > Thank you for the reviews, and I agree with your suggestions: If we find
> > any behavior changes, we will issue a KIP. Is it possible to first
> publish
> > the new implementation in a separate class so that we don't disrupt the
> > existing implementation? Then we can issue a KIP to address the
> > regressions.
> >
> > Yes! That's what I meant to clarify. Just in case other people may be
> confused why we did not propose a KIP right away, but first as a one-pager.
>
>
> > To your questions:
> > 1. I think the coordinator discovery will happen on demand, i.e., the
> > background thread will only try to discover a coordinator when the event
> > (e.g., commit) requires one. I've noted that, and I'll make sure that in
> > the proposal.
> >
> Thanks. Could you then clarify that for the coordinator state transition?
> Is that just between `down` and `initialized`, and then `initialized` can
> then transition back to `down` too?
>

Will do, I'll update the 1 pager to clarify it.

>
>
> > 2. Acked. I think I'll add a section about how each operation works in
> the
> > new implementation.
>
> 3a. I will modify the existing state enum in the new implementation.
> > 3b. Ah, I must have missed this when proofreading the document. I think
> the
> > state should be:
> >   Unjoined -> Commit_async -> Revoking_Partitions -> Partitions_Revoked
> ->
> > Join_Group -> Completed_Rebalancing -> Assinging_Partitions ->
> > Partitions_Assigned -> Stable
> >   I've made a note of that, and I'll update the document.
> >
> Got it, if we are introducing a third state for auto committing, then upon
> completing the rebalance, we may also need to transit to that state since
> we may also revoke partitions, right? This is for fixing KAFKA-14224
>
- Instead of introducing a new state, I think I want to break up the
PREPARING_REBALANCE into Commit and Revoke.  So I think for your suggestion
in 14224, I potentially we could do it this way.
1. REVOKING_PARTITION: mute the partition and send the callback event to
the polling thread
2. PARTITION_REVOKED: Received the revocation ack.  Advance the state to
COMMIT.
3. COMMIT: Commit the offsets.  Wait for the commit to finish, then we can
start the join group.

Again, thanks, I'll update these changes in the 1pager.

>
>
> > 4. Noted. I'll add a section about exception handling.
> > 5a.  Kirk also had the same comment. I updated the document.
> > 5b. yes!
> >
> > Regarding the POC comments, I think the POC branch is actually quite
> dated.
> > I need to update it 😅. Do you think we can start with simple PRs like
> > skeletons of ConsumerBackgroundThread.java and EventHandler interface and
> > implementations?
> >
> > Yes, I think as long as we are introducing new classes / interfaces the
> internal package, we can incrementally merge PRs even if they are not
> integrated with the public KafkaConsumer class yet.
>
>
> > 1. I agree the naming can be confusing, but I think the usage of response
> > and request has implications for the type of event. As you said, BT -> PT
> > handles error and callback events, and PT -> BT handles mostly API calls.
> > I'll think again about these naming.
> > 2. Great suggestions.
> > 3. I think I'll start with defining a few basic events, like assign,
> > commits, and fetch. Then we can modify that as we progress with the
> > project.
> >
> > Thanks,
> > P
> >
> >
> > On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Philip,
> > >
> > > Thanks for writing down the 1-pager. Just to clarify, the reason we
> wrote
> > > this as a 1-pager instead of a KIP is that so far all the
> implementations
> > > should be internal, and hence not impacting anything on the public
> APIs.
> > > If, e.g. we found along with the implementation that some public
> > interfaces
> > > like metrics, need to be modified, then we will send out a separate
> > thread
> > > for that in a KIP proposal, right?
> > >
> > > I made a pass on the 1-pager and also some of the ongoing developments,
> > > here are just some high-level comments:
> > >
> > > On the 1-pager:
> > >
> > > 1) I think it's better to clarify the scenarios under manual assignment
> > > a.k.a. `consumer.assign()`. E.g. does the background thread still
> always
> > > tries to discover coordinator even if there's no commit requests (note
> > > that, if yes, this would be a behavioral change since some users like
> > > Connect may rely on the consumer to not ever try to discover the
> > > coordinator with manual assignment and no commit to brokers)? From the
> > > description it seems if there's no events from the channel to request
> > > committing offsets, it would not try to discover coordinator, but then
> > the
> > > background thread's state would would be in `down` and `initialized`,
> not
> > > in `stable`, and hence we should also allow transition from
> `initialized`
> > > to `down` directly, right?
> > >
> > > 2) From the polling thread, besides the `poll` function changes, I
> think
> > > it's better to also state other blocking function changes like
> commitSync
> > > as well. I'd assume e.g. for commitSync it would be implemented as:
> > >
> > > * Send the commit-request to the server-event channel
> > > * Continue polling from the consumer event channel, but skip other
> events
> > > like rebalance-listener (we still need to bookkeep it for the next
> `poll`
> > > call, but we just cannot trigger them since that breaks compatibility)
> > > until we received the commit response event.
> > >
> > > Some details about how those functions would be implemented would also
> be
> > > very helpful for the community's audience.
> > >
> > > 3) I have a few questions from the rebalance state machine section:
> > >
> > > 3.a). you mentioned in the state machine:
> > >
> > > "Wait for the partition revoked event, and advance the state to
> > > PARTITIONS_REVOKED"
> > > "Wait for partition assignment completion from the polling thread.
> > Advance
> > > to PARTITIONS_ASSIGNED"
> > >
> > > But we do not have those states today, I think you meant to say
> > > "PREPARE_REBALANCING" and "STABLE"?
> > >
> > > 3.b). Also, it seems that the proposed state transition would be Stable
> > ->
> > > Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing ->
> > > Assigning_Partitions -> Stable (for eager protocol at least), but when
> > auto
> > > commit is enabled, we also need to commit offsets for those revoking
> > > partitions, and the auto-committing happens before the
> > > `onPartitionsRevoked` listener is triggered, so should auto-committing
> be
> > > part of the `Revoking_Partitions` state as well?
> > >
> > > 4) Could you expand on the "Exceptions thrown will be different."
> > > description and list all changes to the exceptions (note that, if they
> do
> > > exist, we also need a KIP). For example, besides WakeupException, Kafka
> > > also have a InterruptException (different from Java's own
> > > InterruptedException) defined on the public APIs, are we going to
> change
> > > which functions would throw and which will not?
> > >
> > > 5) In the testing plan:
> > >
> > > 5.a) Could you elaborate a bit more on "We need to make sure the timing
> > of
> > > the 1.  coordinator discovery and 2.  joinGroup operations are being
> done
> > > in the correct timing."
> > >
> > > 5.b) I'd also add that for all the blocking APIs including `poll`
> where a
> > > timeout value is provided either as param, or implicitly from `
> > > default.api.timeout.ms` they would now be strictly respected --- to me
> > > this
> > > is also one of the key motivations of this refactoring :)
> > >
> > > -------------------------
> > >
> > > And the current POC PRs:
> > >
> > > 1. Just a quick thought about the naming of "KafkaServerEventQueue": I
> > > think I also agree with others that it may be confusing to include
> > > `KafkaServer` on a client class, what about renaming it to
> > > "ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? I know
> that
> > > it sounds a bit awkward to have the `ResponseEventQueue` to also return
> > > rebalance-listener-triggering events, but that may be less confusing.
> > >
> > > 2. I'd suggest for new modules like `ConsumerBackgroundThread`, we
> first
> > > defines an interface in the `internals` package, e.g.
> > `RequestEventHandler`
> > > (assuming the previous rename suggestion), and then have a
> > > `DefaultRequestEventHandler` implementation class which encapsulate the
> > > background thread. This enables us to easily write unit tests that
> > isolate
> > > other modules especially with concurrent threadings.
> > >
> > > 3. For `KafkaServerEventType`: where would NOOP being used? Also I
> think
> > > there are other types, like FETCH_COMMITTED as well?
> > >
> > >
> > >
> > > Thanks,
> > > Guozhang
> > >
> > >
> > > On Tue, Sep 13, 2022 at 2:14 PM Philip Nee <philip...@gmail.com>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > Here is the proposal to refactor the Kafka Consumer
> > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
> > > > >.
> > > > The 1-pager is on the wiki, so please take a look at it.  Also, this
> > is a
> > > > prerequisite for KIP-848 (the next gen rebalance protocol).
> > > >
> > > > Cheers,
> > > > P
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to