Hi all!

I've added the "KIP-848" and "ctr" tags to the relevant PRs.

see: https://github.com/apache/kafka/pulls?q=label%3Actr+sort%3Acreated-asc

On Tue, May 9, 2023 at 10:29 AM Philip Nee <philip...@gmail.com> wrote:

> Hey Divij,
>
> Firstly, THANK YOU for reaching out and providing the feedback.  At this
> moment, I don't have all of the answers to your questions at the moment,
> but I hope my responses could be helpful in moving the project forward.
>
> 1. There isn't a prioritization strategy, and the network request buffer
> is just FIFO. Basically, all requests are buffered in the network queue and
> sent out at the end of the event loop where we poll the client.
> Presumably, your concern is that the member can get constantly kicked out
> of the group due to not sending out a heartbeat soon enough under high load
> or network congestion.  I think that's a fair point.  On the other hand, we
> design the background thread (or we would like the design to be...) to be
> lightweight and focus on network communication, such that the client can
> quickly go through all the requests without consuming too many resources.
> In the case of network congestion, I think there could be other reasons
> that cause members to fail, so I don't know if we need the prioritize the
> requests.
>
> 2. Yes, as of now, the fetch offset request is being de-duped, and we
> would do that for other requests as well.  There's also a RequestState
> class that helps request managers track retry backoff (exponential
> backoff), which prevents members from spamming brokers.
>
> 3. There's no implementation of the backpressure for both the application
> and background event queue, but here is my thought:
>  - For ApplicationEventQueue: I can think of a few ways to implement this.
> 1. user handles a RetriableError and re-enqueues the event. or 2. Having
> the application to buffer the events, but then we are just implementing a
> queue on top of the queue.  What's your suggestion here?
>  - For BackgroundEventQueue: The main events are fetches, errors, and
> rebalance triggers.  For fetching, what's in my mind is some kind of memory
> bound mechanism like KIP-81
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer>
>  (it
> hasn't been implemented).  For rebalancing events or errors, there is not
> much we could do aside from pausing the request managers.  What do you
> think?
>
> 4. See above.
>
> 5. I'll add an Observability section to the one-pager. My thinking is
> similar to yours: 1. memory and CPU pressure and 2. event latency.
>
> I would love to incorporate some of your past experience from TinkerPop
> into this design, so thank you again for sharing your feedback!
>
> Thank you!
> P
>
> On Tue, May 9, 2023 at 9:50 AM Divij Vaidya <divijvaidy...@gmail.com>
> wrote:
>
>> Hey Philip
>>
>> Thank you for writing this down.
>>
>> Overall, the ideal of decoupling the responsibility amongst threads and
>> communication using queues looks good. I have a few questions
>> associated with handling failure scenarios.
>>
>> 1. I observed that there is no concept of prioritization in the background
>> thread. In case of high load, should a heartbeat handler be prioritized
>> over listOffset handler?
>> 2. Idempotent requests could be "bulked" together, e.g. if we have two
>> list
>> offsets in the queue, we should be able to call listOffset once and
>> duplicate the response for the requests
>> 3. What would the back pressure handling behaviour from the
>> applicationEventQueue look like? This is bound to occur during
>> intermittent
>> network disruption
>> 4. Same as 3 but for backgroundEventQueue. If the client thread is doing
>> some I/O since the user is blocking it in their business logic, then the
>> queue may get full. What happens then? How do we recover and cancel the
>> resources such as waiting sync APIs?
>> 5. Can we add observability/metrics in the design? As an example, the end
>> user might measure latency from the client thread but the request may
>> actually be spending time in the queue waiting to be picked by the
>> background thread. In such cases, user should have the visibility to
>> identify that a need to scale the background thread processing is
>> required.
>>
>> --
>> Divij Vaidya
>>
>>
>>
>> On Sun, Sep 18, 2022 at 11:24 PM Philip Nee <philip...@gmail.com> wrote:
>>
>> > On Sun, Sep 18, 2022 at 6:03 AM Luke Chen <show...@gmail.com> wrote:
>> >
>> > > Hi Philip,
>> > >
>> > > Thanks for the write-up.
>> > >
>> > Also thank you for taking the time to read the proposal.  Very grateful.
>> >
>> > > Some questions:
>> > >
>> > > 1. You said: If we don't need a coordinator, the background thread
>> will
>> > > stay in the *initialized* state.
>> > > But in the definition of *initialized, *it said:
>> > > *initialized*: The background thread completes initialization, and the
>> > loop
>> > > has started.
>> > > Does that mean, even if we don't need a coordinator, we still create a
>> > > coordinator and run the loop? And why?
>> > >
>> > If we don't need a coordinator, I think the background thread should not
>> > fire a FindCoordinator request, which is what is implemented today
>> right?
>> > In terms of object instantiation, I think we can instantiate the object
>> on
>> > demand.
>> >
>> > >
>> > > 2. Why do we need so many rebalance states?
>> > > After this proposal, we'll change the state number from 4 to 8.
>> > > And we will have many state changes and state check in the code. Could
>> > you
>> > > explain why those states are necessary?
>> > > I can imagine, like the method: `rebalanceInProgress`, will need to
>> check
>> > > if the current state in
>> > >
>> > >
>> >
>> PREPARE_REVOCATION/REVOKING_PARTITION/PARTITION_REVOKED/PREPARING_REBALANCE/.../...
>> > > .
>> > > So, maybe you should explain why these states are necessary. To me,
>> like
>> > > PREPARE_REVOCATION/REVOKING_PARTITION, I don't understand why we need
>> 2
>> > > states for them? Any reason there?
>> > >
>> > I think we need to do 3 things when revoking a partition: There are,
>> before
>> > the revocation, during the revocation, and after the revocation.
>> >   1. Before: prepare the revocation, i.e., pausing the data, send out
>> > commits (as of the current implementation), and send an event to the
>> > polling thread to invoke the callback
>> >   2. During: Waiting for the callback to be triggered.  Meanwhile, the
>> > background thread will continue to poll the channel, until the ack event
>> > arrives
>> >   3. After: Once the invocation ack event is received, we move on to
>> > request to join the group.
>> >
>> > Maybe you are right, perhaps, we don't need the PARTITION_REVOKED state.
>> >
>> > >
>> > > 3. How could we handle the Subscription state change in poll thread
>> > earlier
>> > > than REVOKE_PARTITION/ASSIGN_PARTITION events arrived?
>> > > Suppose, background thread sending REVOKE_PARTITION event to poll
>> thread,
>> > > and before handing it, the consumer updates the subscription.
>> > > So in this situation, we'll still invoke revoke callback or not?
>> > > This won't happen in current design because they are handing in one
>> > thread.
>> > >
>> >
>> > Thanks for bringing this to the table. Would it make sense to lock the
>> > subscriptionState in some scenarios like this? In this case, this
>> requires
>> > API changes, like throwing an exception to tell the users that the
>> > revocation is happening.
>> >
>> > >
>> > > 4. *Is there a better way to configure session interval and heartbeat
>> > > interval?*
>> > > These configs are moved to broker side in KIP-848 IIRC.
>> > > Maybe you can check that KIP and update here.
>> > >
>> > Thanks.
>> >
>> > >
>> > > Some typos and hard to understand
>> > >
>> > Hmm, terribly sorry about these typos.  Turned out I didn't publish the
>> > updated copy.  Will address them and update the document right away.
>> >
>> >
>> > > 5. Firstly, it _mcomplexfixes_ increasingly difficult ???
>> > > 6. it simplifies the _pd_ design ? What is pd?
>> > > 7. In "Background thread and its lifecycle" section, I guess the 2
>> points
>> > > should be 3c and 3d, right?
>> > > That is:
>> > > a. Check if there is an in-flight event. If not, poll for the new
>> events
>> > > from the channel.
>> > > b. Run the state machine, and here are the following scenario:
>> > > c. Poll the networkClient.
>> > > d. Backoff for retryBackoffMs milliseconds.
>> > >
>> > > Thank you.
>> > >
>> > Again, really grateful for you to review the proposal, so thank you!
>> >
>> > P
>> >
>> > > Luke
>> > >
>> > > On Sat, Sep 17, 2022 at 6:29 AM Philip Nee <philip...@gmail.com>
>> wrote:
>> > >
>> > > > On Fri, Sep 16, 2022 at 3:01 PM Guozhang Wang <wangg...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi Philip,
>> > > > >
>> > > > > On Fri, Sep 16, 2022 at 1:20 PM Philip Nee <philip...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > Hi Guozhang,
>> > > > > >
>> > > > > > Thank you for the reviews, and I agree with your suggestions:
>> If we
>> > > > find
>> > > > > > any behavior changes, we will issue a KIP. Is it possible to
>> first
>> > > > > publish
>> > > > > > the new implementation in a separate class so that we don't
>> disrupt
>> > > the
>> > > > > > existing implementation? Then we can issue a KIP to address the
>> > > > > > regressions.
>> > > > > >
>> > > > > > Yes! That's what I meant to clarify. Just in case other people
>> may
>> > be
>> > > > > confused why we did not propose a KIP right away, but first as a
>> > > > one-pager.
>> > > > >
>> > > > >
>> > > > > > To your questions:
>> > > > > > 1. I think the coordinator discovery will happen on demand,
>> i.e.,
>> > the
>> > > > > > background thread will only try to discover a coordinator when
>> the
>> > > > event
>> > > > > > (e.g., commit) requires one. I've noted that, and I'll make sure
>> > that
>> > > > in
>> > > > > > the proposal.
>> > > > > >
>> > > > > Thanks. Could you then clarify that for the coordinator state
>> > > transition?
>> > > > > Is that just between `down` and `initialized`, and then
>> `initialized`
>> > > can
>> > > > > then transition back to `down` too?
>> > > > >
>> > > >
>> > > > Will do, I'll update the 1 pager to clarify it.
>> > > >
>> > > > >
>> > > > >
>> > > > > > 2. Acked. I think I'll add a section about how each operation
>> works
>> > > in
>> > > > > the
>> > > > > > new implementation.
>> > > > >
>> > > > > 3a. I will modify the existing state enum in the new
>> implementation.
>> > > > > > 3b. Ah, I must have missed this when proofreading the document.
>> I
>> > > think
>> > > > > the
>> > > > > > state should be:
>> > > > > >   Unjoined -> Commit_async -> Revoking_Partitions ->
>> > > Partitions_Revoked
>> > > > > ->
>> > > > > > Join_Group -> Completed_Rebalancing -> Assinging_Partitions ->
>> > > > > > Partitions_Assigned -> Stable
>> > > > > >   I've made a note of that, and I'll update the document.
>> > > > > >
>> > > > > Got it, if we are introducing a third state for auto committing,
>> then
>> > > > upon
>> > > > > completing the rebalance, we may also need to transit to that
>> state
>> > > since
>> > > > > we may also revoke partitions, right? This is for fixing
>> KAFKA-14224
>> > > > >
>> > > > - Instead of introducing a new state, I think I want to break up the
>> > > > PREPARING_REBALANCE into Commit and Revoke.  So I think for your
>> > > suggestion
>> > > > in 14224, I potentially we could do it this way.
>> > > > 1. REVOKING_PARTITION: mute the partition and send the callback
>> event
>> > to
>> > > > the polling thread
>> > > > 2. PARTITION_REVOKED: Received the revocation ack.  Advance the
>> state
>> > to
>> > > > COMMIT.
>> > > > 3. COMMIT: Commit the offsets.  Wait for the commit to finish, then
>> we
>> > > can
>> > > > start the join group.
>> > > >
>> > > > Again, thanks, I'll update these changes in the 1pager.
>> > > >
>> > > > >
>> > > > >
>> > > > > > 4. Noted. I'll add a section about exception handling.
>> > > > > > 5a.  Kirk also had the same comment. I updated the document.
>> > > > > > 5b. yes!
>> > > > > >
>> > > > > > Regarding the POC comments, I think the POC branch is actually
>> > quite
>> > > > > dated.
>> > > > > > I need to update it 😅. Do you think we can start with simple
>> PRs
>> > > like
>> > > > > > skeletons of ConsumerBackgroundThread.java and EventHandler
>> > interface
>> > > > and
>> > > > > > implementations?
>> > > > > >
>> > > > > > Yes, I think as long as we are introducing new classes /
>> interfaces
>> > > the
>> > > > > internal package, we can incrementally merge PRs even if they are
>> not
>> > > > > integrated with the public KafkaConsumer class yet.
>> > > > >
>> > > > >
>> > > > > > 1. I agree the naming can be confusing, but I think the usage of
>> > > > response
>> > > > > > and request has implications for the type of event. As you
>> said, BT
>> > > ->
>> > > > PT
>> > > > > > handles error and callback events, and PT -> BT handles mostly
>> API
>> > > > calls.
>> > > > > > I'll think again about these naming.
>> > > > > > 2. Great suggestions.
>> > > > > > 3. I think I'll start with defining a few basic events, like
>> > assign,
>> > > > > > commits, and fetch. Then we can modify that as we progress with
>> the
>> > > > > > project.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > P
>> > > > > >
>> > > > > >
>> > > > > > On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang <
>> wangg...@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hello Philip,
>> > > > > > >
>> > > > > > > Thanks for writing down the 1-pager. Just to clarify, the
>> reason
>> > we
>> > > > > wrote
>> > > > > > > this as a 1-pager instead of a KIP is that so far all the
>> > > > > implementations
>> > > > > > > should be internal, and hence not impacting anything on the
>> > public
>> > > > > APIs.
>> > > > > > > If, e.g. we found along with the implementation that some
>> public
>> > > > > > interfaces
>> > > > > > > like metrics, need to be modified, then we will send out a
>> > separate
>> > > > > > thread
>> > > > > > > for that in a KIP proposal, right?
>> > > > > > >
>> > > > > > > I made a pass on the 1-pager and also some of the ongoing
>> > > > developments,
>> > > > > > > here are just some high-level comments:
>> > > > > > >
>> > > > > > > On the 1-pager:
>> > > > > > >
>> > > > > > > 1) I think it's better to clarify the scenarios under manual
>> > > > assignment
>> > > > > > > a.k.a. `consumer.assign()`. E.g. does the background thread
>> still
>> > > > > always
>> > > > > > > tries to discover coordinator even if there's no commit
>> requests
>> > > > (note
>> > > > > > > that, if yes, this would be a behavioral change since some
>> users
>> > > like
>> > > > > > > Connect may rely on the consumer to not ever try to discover
>> the
>> > > > > > > coordinator with manual assignment and no commit to brokers)?
>> > From
>> > > > the
>> > > > > > > description it seems if there's no events from the channel to
>> > > request
>> > > > > > > committing offsets, it would not try to discover coordinator,
>> but
>> > > > then
>> > > > > > the
>> > > > > > > background thread's state would would be in `down` and
>> > > `initialized`,
>> > > > > not
>> > > > > > > in `stable`, and hence we should also allow transition from
>> > > > > `initialized`
>> > > > > > > to `down` directly, right?
>> > > > > > >
>> > > > > > > 2) From the polling thread, besides the `poll` function
>> changes,
>> > I
>> > > > > think
>> > > > > > > it's better to also state other blocking function changes like
>> > > > > commitSync
>> > > > > > > as well. I'd assume e.g. for commitSync it would be
>> implemented
>> > as:
>> > > > > > >
>> > > > > > > * Send the commit-request to the server-event channel
>> > > > > > > * Continue polling from the consumer event channel, but skip
>> > other
>> > > > > events
>> > > > > > > like rebalance-listener (we still need to bookkeep it for the
>> > next
>> > > > > `poll`
>> > > > > > > call, but we just cannot trigger them since that breaks
>> > > > compatibility)
>> > > > > > > until we received the commit response event.
>> > > > > > >
>> > > > > > > Some details about how those functions would be implemented
>> would
>> > > > also
>> > > > > be
>> > > > > > > very helpful for the community's audience.
>> > > > > > >
>> > > > > > > 3) I have a few questions from the rebalance state machine
>> > section:
>> > > > > > >
>> > > > > > > 3.a). you mentioned in the state machine:
>> > > > > > >
>> > > > > > > "Wait for the partition revoked event, and advance the state
>> to
>> > > > > > > PARTITIONS_REVOKED"
>> > > > > > > "Wait for partition assignment completion from the polling
>> > thread.
>> > > > > > Advance
>> > > > > > > to PARTITIONS_ASSIGNED"
>> > > > > > >
>> > > > > > > But we do not have those states today, I think you meant to
>> say
>> > > > > > > "PREPARE_REBALANCING" and "STABLE"?
>> > > > > > >
>> > > > > > > 3.b). Also, it seems that the proposed state transition would
>> be
>> > > > Stable
>> > > > > > ->
>> > > > > > > Revoking_Partitions -> Prepare_Rebalancing ->
>> > Complete_Rebalancing
>> > > ->
>> > > > > > > Assigning_Partitions -> Stable (for eager protocol at least),
>> but
>> > > > when
>> > > > > > auto
>> > > > > > > commit is enabled, we also need to commit offsets for those
>> > > revoking
>> > > > > > > partitions, and the auto-committing happens before the
>> > > > > > > `onPartitionsRevoked` listener is triggered, so should
>> > > > auto-committing
>> > > > > be
>> > > > > > > part of the `Revoking_Partitions` state as well?
>> > > > > > >
>> > > > > > > 4) Could you expand on the "Exceptions thrown will be
>> different."
>> > > > > > > description and list all changes to the exceptions (note
>> that, if
>> > > > they
>> > > > > do
>> > > > > > > exist, we also need a KIP). For example, besides
>> WakeupException,
>> > > > Kafka
>> > > > > > > also have a InterruptException (different from Java's own
>> > > > > > > InterruptedException) defined on the public APIs, are we
>> going to
>> > > > > change
>> > > > > > > which functions would throw and which will not?
>> > > > > > >
>> > > > > > > 5) In the testing plan:
>> > > > > > >
>> > > > > > > 5.a) Could you elaborate a bit more on "We need to make sure
>> the
>> > > > timing
>> > > > > > of
>> > > > > > > the 1.  coordinator discovery and 2.  joinGroup operations are
>> > > being
>> > > > > done
>> > > > > > > in the correct timing."
>> > > > > > >
>> > > > > > > 5.b) I'd also add that for all the blocking APIs including
>> `poll`
>> > > > > where a
>> > > > > > > timeout value is provided either as param, or implicitly from
>> `
>> > > > > > > default.api.timeout.ms` they would now be strictly respected
>> ---
>> > > to
>> > > > me
>> > > > > > > this
>> > > > > > > is also one of the key motivations of this refactoring :)
>> > > > > > >
>> > > > > > > -------------------------
>> > > > > > >
>> > > > > > > And the current POC PRs:
>> > > > > > >
>> > > > > > > 1. Just a quick thought about the naming of
>> > > "KafkaServerEventQueue":
>> > > > I
>> > > > > > > think I also agree with others that it may be confusing to
>> > include
>> > > > > > > `KafkaServer` on a client class, what about renaming it to
>> > > > > > > "ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`?
>> I
>> > > know
>> > > > > that
>> > > > > > > it sounds a bit awkward to have the `ResponseEventQueue` to
>> also
>> > > > return
>> > > > > > > rebalance-listener-triggering events, but that may be less
>> > > confusing.
>> > > > > > >
>> > > > > > > 2. I'd suggest for new modules like
>> `ConsumerBackgroundThread`,
>> > we
>> > > > > first
>> > > > > > > defines an interface in the `internals` package, e.g.
>> > > > > > `RequestEventHandler`
>> > > > > > > (assuming the previous rename suggestion), and then have a
>> > > > > > > `DefaultRequestEventHandler` implementation class which
>> > encapsulate
>> > > > the
>> > > > > > > background thread. This enables us to easily write unit tests
>> > that
>> > > > > > isolate
>> > > > > > > other modules especially with concurrent threadings.
>> > > > > > >
>> > > > > > > 3. For `KafkaServerEventType`: where would NOOP being used?
>> Also
>> > I
>> > > > > think
>> > > > > > > there are other types, like FETCH_COMMITTED as well?
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > >
>> > > > > > > On Tue, Sep 13, 2022 at 2:14 PM Philip Nee <
>> philip...@gmail.com>
>> > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi all,
>> > > > > > > >
>> > > > > > > > Here is the proposal to refactor the Kafka Consumer
>> > > > > > > > <
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
>> > > > > > > > >.
>> > > > > > > > The 1-pager is on the wiki, so please take a look at it.
>> Also,
>> > > > this
>> > > > > > is a
>> > > > > > > > prerequisite for KIP-848 (the next gen rebalance protocol).
>> > > > > > > >
>> > > > > > > > Cheers,
>> > > > > > > > P
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > > -- Guozhang
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to