On Sun, Sep 18, 2022 at 6:03 AM Luke Chen <show...@gmail.com> wrote:

> Hi Philip,
>
> Thanks for the write-up.
>
Also thank you for taking the time to read the proposal.  Very grateful.

> Some questions:
>
> 1. You said: If we don't need a coordinator, the background thread will
> stay in the *initialized* state.
> But in the definition of *initialized, *it said:
> *initialized*: The background thread completes initialization, and the loop
> has started.
> Does that mean, even if we don't need a coordinator, we still create a
> coordinator and run the loop? And why?
>
If we don't need a coordinator, I think the background thread should not
fire a FindCoordinator request, which is what is implemented today right?
In terms of object instantiation, I think we can instantiate the object on
demand.

>
> 2. Why do we need so many rebalance states?
> After this proposal, we'll change the state number from 4 to 8.
> And we will have many state changes and state check in the code. Could you
> explain why those states are necessary?
> I can imagine, like the method: `rebalanceInProgress`, will need to check
> if the current state in
>
> PREPARE_REVOCATION/REVOKING_PARTITION/PARTITION_REVOKED/PREPARING_REBALANCE/.../...
> .
> So, maybe you should explain why these states are necessary. To me, like
> PREPARE_REVOCATION/REVOKING_PARTITION, I don't understand why we need 2
> states for them? Any reason there?
>
I think we need to do 3 things when revoking a partition: There are, before
the revocation, during the revocation, and after the revocation.
  1. Before: prepare the revocation, i.e., pausing the data, send out
commits (as of the current implementation), and send an event to the
polling thread to invoke the callback
  2. During: Waiting for the callback to be triggered.  Meanwhile, the
background thread will continue to poll the channel, until the ack event
arrives
  3. After: Once the invocation ack event is received, we move on to
request to join the group.

Maybe you are right, perhaps, we don't need the PARTITION_REVOKED state.

>
> 3. How could we handle the Subscription state change in poll thread earlier
> than REVOKE_PARTITION/ASSIGN_PARTITION events arrived?
> Suppose, background thread sending REVOKE_PARTITION event to poll thread,
> and before handing it, the consumer updates the subscription.
> So in this situation, we'll still invoke revoke callback or not?
> This won't happen in current design because they are handing in one thread.
>

Thanks for bringing this to the table. Would it make sense to lock the
subscriptionState in some scenarios like this? In this case, this requires
API changes, like throwing an exception to tell the users that the
revocation is happening.

>
> 4. *Is there a better way to configure session interval and heartbeat
> interval?*
> These configs are moved to broker side in KIP-848 IIRC.
> Maybe you can check that KIP and update here.
>
Thanks.

>
> Some typos and hard to understand
>
Hmm, terribly sorry about these typos.  Turned out I didn't publish the
updated copy.  Will address them and update the document right away.


> 5. Firstly, it _mcomplexfixes_ increasingly difficult ???
> 6. it simplifies the _pd_ design ? What is pd?
> 7. In "Background thread and its lifecycle" section, I guess the 2 points
> should be 3c and 3d, right?
> That is:
> a. Check if there is an in-flight event. If not, poll for the new events
> from the channel.
> b. Run the state machine, and here are the following scenario:
> c. Poll the networkClient.
> d. Backoff for retryBackoffMs milliseconds.
>
> Thank you.
>
Again, really grateful for you to review the proposal, so thank you!

P

> Luke
>
> On Sat, Sep 17, 2022 at 6:29 AM Philip Nee <philip...@gmail.com> wrote:
>
> > On Fri, Sep 16, 2022 at 3:01 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hi Philip,
> > >
> > > On Fri, Sep 16, 2022 at 1:20 PM Philip Nee <philip...@gmail.com>
> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Thank you for the reviews, and I agree with your suggestions: If we
> > find
> > > > any behavior changes, we will issue a KIP. Is it possible to first
> > > publish
> > > > the new implementation in a separate class so that we don't disrupt
> the
> > > > existing implementation? Then we can issue a KIP to address the
> > > > regressions.
> > > >
> > > > Yes! That's what I meant to clarify. Just in case other people may be
> > > confused why we did not propose a KIP right away, but first as a
> > one-pager.
> > >
> > >
> > > > To your questions:
> > > > 1. I think the coordinator discovery will happen on demand, i.e., the
> > > > background thread will only try to discover a coordinator when the
> > event
> > > > (e.g., commit) requires one. I've noted that, and I'll make sure that
> > in
> > > > the proposal.
> > > >
> > > Thanks. Could you then clarify that for the coordinator state
> transition?
> > > Is that just between `down` and `initialized`, and then `initialized`
> can
> > > then transition back to `down` too?
> > >
> >
> > Will do, I'll update the 1 pager to clarify it.
> >
> > >
> > >
> > > > 2. Acked. I think I'll add a section about how each operation works
> in
> > > the
> > > > new implementation.
> > >
> > > 3a. I will modify the existing state enum in the new implementation.
> > > > 3b. Ah, I must have missed this when proofreading the document. I
> think
> > > the
> > > > state should be:
> > > >   Unjoined -> Commit_async -> Revoking_Partitions ->
> Partitions_Revoked
> > > ->
> > > > Join_Group -> Completed_Rebalancing -> Assinging_Partitions ->
> > > > Partitions_Assigned -> Stable
> > > >   I've made a note of that, and I'll update the document.
> > > >
> > > Got it, if we are introducing a third state for auto committing, then
> > upon
> > > completing the rebalance, we may also need to transit to that state
> since
> > > we may also revoke partitions, right? This is for fixing KAFKA-14224
> > >
> > - Instead of introducing a new state, I think I want to break up the
> > PREPARING_REBALANCE into Commit and Revoke.  So I think for your
> suggestion
> > in 14224, I potentially we could do it this way.
> > 1. REVOKING_PARTITION: mute the partition and send the callback event to
> > the polling thread
> > 2. PARTITION_REVOKED: Received the revocation ack.  Advance the state to
> > COMMIT.
> > 3. COMMIT: Commit the offsets.  Wait for the commit to finish, then we
> can
> > start the join group.
> >
> > Again, thanks, I'll update these changes in the 1pager.
> >
> > >
> > >
> > > > 4. Noted. I'll add a section about exception handling.
> > > > 5a.  Kirk also had the same comment. I updated the document.
> > > > 5b. yes!
> > > >
> > > > Regarding the POC comments, I think the POC branch is actually quite
> > > dated.
> > > > I need to update it 😅. Do you think we can start with simple PRs
> like
> > > > skeletons of ConsumerBackgroundThread.java and EventHandler interface
> > and
> > > > implementations?
> > > >
> > > > Yes, I think as long as we are introducing new classes / interfaces
> the
> > > internal package, we can incrementally merge PRs even if they are not
> > > integrated with the public KafkaConsumer class yet.
> > >
> > >
> > > > 1. I agree the naming can be confusing, but I think the usage of
> > response
> > > > and request has implications for the type of event. As you said, BT
> ->
> > PT
> > > > handles error and callback events, and PT -> BT handles mostly API
> > calls.
> > > > I'll think again about these naming.
> > > > 2. Great suggestions.
> > > > 3. I think I'll start with defining a few basic events, like assign,
> > > > commits, and fetch. Then we can modify that as we progress with the
> > > > project.
> > > >
> > > > Thanks,
> > > > P
> > > >
> > > >
> > > > On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Philip,
> > > > >
> > > > > Thanks for writing down the 1-pager. Just to clarify, the reason we
> > > wrote
> > > > > this as a 1-pager instead of a KIP is that so far all the
> > > implementations
> > > > > should be internal, and hence not impacting anything on the public
> > > APIs.
> > > > > If, e.g. we found along with the implementation that some public
> > > > interfaces
> > > > > like metrics, need to be modified, then we will send out a separate
> > > > thread
> > > > > for that in a KIP proposal, right?
> > > > >
> > > > > I made a pass on the 1-pager and also some of the ongoing
> > developments,
> > > > > here are just some high-level comments:
> > > > >
> > > > > On the 1-pager:
> > > > >
> > > > > 1) I think it's better to clarify the scenarios under manual
> > assignment
> > > > > a.k.a. `consumer.assign()`. E.g. does the background thread still
> > > always
> > > > > tries to discover coordinator even if there's no commit requests
> > (note
> > > > > that, if yes, this would be a behavioral change since some users
> like
> > > > > Connect may rely on the consumer to not ever try to discover the
> > > > > coordinator with manual assignment and no commit to brokers)? From
> > the
> > > > > description it seems if there's no events from the channel to
> request
> > > > > committing offsets, it would not try to discover coordinator, but
> > then
> > > > the
> > > > > background thread's state would would be in `down` and
> `initialized`,
> > > not
> > > > > in `stable`, and hence we should also allow transition from
> > > `initialized`
> > > > > to `down` directly, right?
> > > > >
> > > > > 2) From the polling thread, besides the `poll` function changes, I
> > > think
> > > > > it's better to also state other blocking function changes like
> > > commitSync
> > > > > as well. I'd assume e.g. for commitSync it would be implemented as:
> > > > >
> > > > > * Send the commit-request to the server-event channel
> > > > > * Continue polling from the consumer event channel, but skip other
> > > events
> > > > > like rebalance-listener (we still need to bookkeep it for the next
> > > `poll`
> > > > > call, but we just cannot trigger them since that breaks
> > compatibility)
> > > > > until we received the commit response event.
> > > > >
> > > > > Some details about how those functions would be implemented would
> > also
> > > be
> > > > > very helpful for the community's audience.
> > > > >
> > > > > 3) I have a few questions from the rebalance state machine section:
> > > > >
> > > > > 3.a). you mentioned in the state machine:
> > > > >
> > > > > "Wait for the partition revoked event, and advance the state to
> > > > > PARTITIONS_REVOKED"
> > > > > "Wait for partition assignment completion from the polling thread.
> > > > Advance
> > > > > to PARTITIONS_ASSIGNED"
> > > > >
> > > > > But we do not have those states today, I think you meant to say
> > > > > "PREPARE_REBALANCING" and "STABLE"?
> > > > >
> > > > > 3.b). Also, it seems that the proposed state transition would be
> > Stable
> > > > ->
> > > > > Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing
> ->
> > > > > Assigning_Partitions -> Stable (for eager protocol at least), but
> > when
> > > > auto
> > > > > commit is enabled, we also need to commit offsets for those
> revoking
> > > > > partitions, and the auto-committing happens before the
> > > > > `onPartitionsRevoked` listener is triggered, so should
> > auto-committing
> > > be
> > > > > part of the `Revoking_Partitions` state as well?
> > > > >
> > > > > 4) Could you expand on the "Exceptions thrown will be different."
> > > > > description and list all changes to the exceptions (note that, if
> > they
> > > do
> > > > > exist, we also need a KIP). For example, besides WakeupException,
> > Kafka
> > > > > also have a InterruptException (different from Java's own
> > > > > InterruptedException) defined on the public APIs, are we going to
> > > change
> > > > > which functions would throw and which will not?
> > > > >
> > > > > 5) In the testing plan:
> > > > >
> > > > > 5.a) Could you elaborate a bit more on "We need to make sure the
> > timing
> > > > of
> > > > > the 1.  coordinator discovery and 2.  joinGroup operations are
> being
> > > done
> > > > > in the correct timing."
> > > > >
> > > > > 5.b) I'd also add that for all the blocking APIs including `poll`
> > > where a
> > > > > timeout value is provided either as param, or implicitly from `
> > > > > default.api.timeout.ms` they would now be strictly respected ---
> to
> > me
> > > > > this
> > > > > is also one of the key motivations of this refactoring :)
> > > > >
> > > > > -------------------------
> > > > >
> > > > > And the current POC PRs:
> > > > >
> > > > > 1. Just a quick thought about the naming of
> "KafkaServerEventQueue":
> > I
> > > > > think I also agree with others that it may be confusing to include
> > > > > `KafkaServer` on a client class, what about renaming it to
> > > > > "ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? I
> know
> > > that
> > > > > it sounds a bit awkward to have the `ResponseEventQueue` to also
> > return
> > > > > rebalance-listener-triggering events, but that may be less
> confusing.
> > > > >
> > > > > 2. I'd suggest for new modules like `ConsumerBackgroundThread`, we
> > > first
> > > > > defines an interface in the `internals` package, e.g.
> > > > `RequestEventHandler`
> > > > > (assuming the previous rename suggestion), and then have a
> > > > > `DefaultRequestEventHandler` implementation class which encapsulate
> > the
> > > > > background thread. This enables us to easily write unit tests that
> > > > isolate
> > > > > other modules especially with concurrent threadings.
> > > > >
> > > > > 3. For `KafkaServerEventType`: where would NOOP being used? Also I
> > > think
> > > > > there are other types, like FETCH_COMMITTED as well?
> > > > >
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Sep 13, 2022 at 2:14 PM Philip Nee <philip...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Here is the proposal to refactor the Kafka Consumer
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
> > > > > > >.
> > > > > > The 1-pager is on the wiki, so please take a look at it.  Also,
> > this
> > > > is a
> > > > > > prerequisite for KIP-848 (the next gen rebalance protocol).
> > > > > >
> > > > > > Cheers,
> > > > > > P
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to