Hey Divij,

Sorry for the late reply - I was away for some extended period of time, but
I'm back to address some of the questions here.

To your point, we will incorporate your suggestions to the 1-pager, and
well as the future KIP-945 (we decided to create a kip for this work for
better publicity).

Here are some of my feedback for your suggestions, and I will respond to
each of the bullet points above:
1. For the current design, the background thread and the main thread handle
different types of tasks, and the background thread should continue making
the system calls even if the user thread is under high load.  However,
currently, we did not implement prioritization because all calls should be
sent out by the end of each event loop.  Since the background thread is
meant to be fairly lightweight by only handling the in/out-bound of the
network requests, I believe your concern should have been addressed.
2. The background thread will poll until the heartbeat or the metadata
update timer is expired or if there are any new events coming in.  In this
case, the background would not be busy waiting for the next event loop
to happen.
3. Similar to #1, the application thread works independently of the
background thread, so there shouldn't be any concern about blocking
behavior.
4. We implemented batching for a few API calls but not all.  We will
implement this optimization after having a working client!
5. The network util should check the timer of the unsent request before
sending them out.  The request will be removed if the timer expires.
6. The network connections are reused, I believe.

7. The queues aren't used for system calls.  The system calls are issued by
the "Request Manager" at the top of the event loop, and sent out by the end
of the event loop.  As long as the background thread is running, the
heartbeat and metadata updates should continue to happen.  The
clarification: we only use the queues for communication purpose: The
application event queue sends APIs calls to the background thread, and the
background event queue allows the background thread to notify the user
about errors and events like callback invocation.
8. Noted.  KAFKA-15173 <https://issues.apache.org/jira/browse/KAFKA-15173> was
added per your suggestion.

9. Noted: KAFKA-15173 <https://issues.apache.org/jira/browse/KAFKA-15173> was
added per your suggestion.
10. Noted. We will spend time examining this item.
11. Noted.
12. Noted.

13. Similar to #7: I think this is the optimization we need to investigate
after delivering the client.  KAFKA-15175 was created for this.

Thanks!
P


On Wed, May 24, 2023 at 7:20 AM Divij Vaidya <divijvaidy...@gmail.com>
wrote:

> Thank you Philip for the response. I will post more comments about this in
> the upcoming days but meanwhile here are my initial thoughts. Please let me
> know what you think about them.
>
> Addition to terminology
> Let's divide the API calls from the consumer into two categories. System
> API calls and User API calls.
>
> System APIs are the ones that are required for ensuring that
> consumers remain connected to the correct broker. Examples of system API
> calls would be heartbeat, metadata etc.
> User APIs are the ones which are invoked on-demand by the user. Examples of
> user API calls would be poll, commit etc.
>
> Now, may I suggest adding the following set of requirements/principles
> which the design should uphold:
> 1. System API calls should work seamlessly even when the consumer is under
> high load from user API calls.
> 2. Threads should consume minimal CPU when not doing any work. i.e. threads
> should not be performing busy waits (infinite loops without any block)
> while idle.
> 3. It should not be possible to block the background thread by the
> application/user.
> 4. Multiple occurrences of idempotent API requests should be bulked
> together. Note that this is different from batch. In bulking, we execute
> one request and use its response as the response to other duplicates /
> similar requests.
> 5. Requests which have exceeded their timeout should not be sent to the
> broker.
> 6. Design should minimize the number of connections created to the broker.
> This means that both system and user API requests should use the same
> network connection. It also means that, in this design, we are not solving
> the problem of network layer starvation of system requests due to too many
> user requests.
>
> Based on the above requirements, I would suggest the following changes:
> 7. We should consider adding another requestQueue (similar to
> ApplicationEvent queue) which will contain only the system events. The
> background thread will first dequeue from this system queue and afterwards
> go to the user event queue (ApplicationEvent queue). A daemon thread for
> each system event will regularly add events to the system queue at a fixed
> periodic interval. As a future optimization, we will piggy-back heartbeat
> with other requests such that any request from the client to the broker
> will be considered as a heartbeat. This solves the prioritization and noisy
> neighbour problem for us amongst the system and user APIs.
> 8. We should add assertions in the callbacks to ensure that the correct
> thread is executing them. As an example, I have found the pattern used by
> netty very useful [1].
>
> Handling of failure scenarios
> The document is missing explanation for handling of various failure
> scenarios. For example, we should document the behaviour for the following:
> 9. When the ApplicationEvent queue gets full. Currently, it is an unbounded
> queue and will fail with an OutOfMemory exception. We may want to bound it
> with a user configurable value. Bounding will help us fail gracefully
> instead of crashing with OOM. The graceful failure in this case should be
> in the form of a re-triable exception to the user with an instruction to
> adopt an exponential backoff and retry.
> 10. When the background thread gets interrupted / killed for some reason,
> we should be able to detect it and close the consumer gracefully.
> 11. We should document how the tear down of the consumer is handled. The
> unsent requests sitting in the queue will probably be discarded and the
> in-flights futures completed. exceptionally. We should let the user know
> (via a log) the amount of requests that are being discarded.
> 12. We should document how the timeout for APIs which are time bound are
> handled. As an example, if the requests are still in the queue / unsentList
> when timeout hits, do we even want to send it across to the server?
>
> Future improvements:
> 13. Lastly, as a future improvement, we should probably look at using Java
> NIO2's asynchronous socket channel APIs (instead of the current
> implementation of blocking network IO in Kafka). It would greatly improve
> the background thread performance, since unlike the broker, this is the
> single thread doing most of the work on the consumer.
>
> Once we have reached a conclusion on the above points, perhaps, we can
> create JIRAs to address them and work with the community members to resolve
> them.
>
> [1]
>
> https://github.com/netty/netty/blob/57d536c59e434e88c0d8374d6b03671edb02885f/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java#L65
>
>
>
> --
> Divij Vaidya
>
>
>
> On Tue, May 23, 2023 at 6:57 PM Philip Nee <philip...@gmail.com> wrote:
>
> > Hi all!
> >
> > I've added the "KIP-848" and "ctr" tags to the relevant PRs.
> >
> > see:
> > https://github.com/apache/kafka/pulls?q=label%3Actr+sort%3Acreated-asc
> >
> > On Tue, May 9, 2023 at 10:29 AM Philip Nee <philip...@gmail.com> wrote:
> >
> > > Hey Divij,
> > >
> > > Firstly, THANK YOU for reaching out and providing the feedback.  At
> this
> > > moment, I don't have all of the answers to your questions at the
> moment,
> > > but I hope my responses could be helpful in moving the project forward.
> > >
> > > 1. There isn't a prioritization strategy, and the network request
> buffer
> > > is just FIFO. Basically, all requests are buffered in the network queue
> > and
> > > sent out at the end of the event loop where we poll the client.
> > > Presumably, your concern is that the member can get constantly kicked
> out
> > > of the group due to not sending out a heartbeat soon enough under high
> > load
> > > or network congestion.  I think that's a fair point.  On the other
> hand,
> > we
> > > design the background thread (or we would like the design to be...) to
> be
> > > lightweight and focus on network communication, such that the client
> can
> > > quickly go through all the requests without consuming too many
> resources.
> > > In the case of network congestion, I think there could be other reasons
> > > that cause members to fail, so I don't know if we need the prioritize
> the
> > > requests.
> > >
> > > 2. Yes, as of now, the fetch offset request is being de-duped, and we
> > > would do that for other requests as well.  There's also a RequestState
> > > class that helps request managers track retry backoff (exponential
> > > backoff), which prevents members from spamming brokers.
> > >
> > > 3. There's no implementation of the backpressure for both the
> application
> > > and background event queue, but here is my thought:
> > >  - For ApplicationEventQueue: I can think of a few ways to implement
> > this.
> > > 1. user handles a RetriableError and re-enqueues the event. or 2.
> Having
> > > the application to buffer the events, but then we are just
> implementing a
> > > queue on top of the queue.  What's your suggestion here?
> > >  - For BackgroundEventQueue: The main events are fetches, errors, and
> > > rebalance triggers.  For fetching, what's in my mind is some kind of
> > memory
> > > bound mechanism like KIP-81
> > > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >
> > (it
> > > hasn't been implemented).  For rebalancing events or errors, there is
> not
> > > much we could do aside from pausing the request managers.  What do you
> > > think?
> > >
> > > 4. See above.
> > >
> > > 5. I'll add an Observability section to the one-pager. My thinking is
> > > similar to yours: 1. memory and CPU pressure and 2. event latency.
> > >
> > > I would love to incorporate some of your past experience from TinkerPop
> > > into this design, so thank you again for sharing your feedback!
> > >
> > > Thank you!
> > > P
> > >
> > > On Tue, May 9, 2023 at 9:50 AM Divij Vaidya <divijvaidy...@gmail.com>
> > > wrote:
> > >
> > >> Hey Philip
> > >>
> > >> Thank you for writing this down.
> > >>
> > >> Overall, the ideal of decoupling the responsibility amongst threads
> and
> > >> communication using queues looks good. I have a few questions
> > >> associated with handling failure scenarios.
> > >>
> > >> 1. I observed that there is no concept of prioritization in the
> > background
> > >> thread. In case of high load, should a heartbeat handler be
> prioritized
> > >> over listOffset handler?
> > >> 2. Idempotent requests could be "bulked" together, e.g. if we have two
> > >> list
> > >> offsets in the queue, we should be able to call listOffset once and
> > >> duplicate the response for the requests
> > >> 3. What would the back pressure handling behaviour from the
> > >> applicationEventQueue look like? This is bound to occur during
> > >> intermittent
> > >> network disruption
> > >> 4. Same as 3 but for backgroundEventQueue. If the client thread is
> doing
> > >> some I/O since the user is blocking it in their business logic, then
> the
> > >> queue may get full. What happens then? How do we recover and cancel
> the
> > >> resources such as waiting sync APIs?
> > >> 5. Can we add observability/metrics in the design? As an example, the
> > end
> > >> user might measure latency from the client thread but the request may
> > >> actually be spending time in the queue waiting to be picked by the
> > >> background thread. In such cases, user should have the visibility to
> > >> identify that a need to scale the background thread processing is
> > >> required.
> > >>
> > >> --
> > >> Divij Vaidya
> > >>
> > >>
> > >>
> > >> On Sun, Sep 18, 2022 at 11:24 PM Philip Nee <philip...@gmail.com>
> > wrote:
> > >>
> > >> > On Sun, Sep 18, 2022 at 6:03 AM Luke Chen <show...@gmail.com>
> wrote:
> > >> >
> > >> > > Hi Philip,
> > >> > >
> > >> > > Thanks for the write-up.
> > >> > >
> > >> > Also thank you for taking the time to read the proposal.  Very
> > grateful.
> > >> >
> > >> > > Some questions:
> > >> > >
> > >> > > 1. You said: If we don't need a coordinator, the background thread
> > >> will
> > >> > > stay in the *initialized* state.
> > >> > > But in the definition of *initialized, *it said:
> > >> > > *initialized*: The background thread completes initialization, and
> > the
> > >> > loop
> > >> > > has started.
> > >> > > Does that mean, even if we don't need a coordinator, we still
> > create a
> > >> > > coordinator and run the loop? And why?
> > >> > >
> > >> > If we don't need a coordinator, I think the background thread should
> > not
> > >> > fire a FindCoordinator request, which is what is implemented today
> > >> right?
> > >> > In terms of object instantiation, I think we can instantiate the
> > object
> > >> on
> > >> > demand.
> > >> >
> > >> > >
> > >> > > 2. Why do we need so many rebalance states?
> > >> > > After this proposal, we'll change the state number from 4 to 8.
> > >> > > And we will have many state changes and state check in the code.
> > Could
> > >> > you
> > >> > > explain why those states are necessary?
> > >> > > I can imagine, like the method: `rebalanceInProgress`, will need
> to
> > >> check
> > >> > > if the current state in
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> PREPARE_REVOCATION/REVOKING_PARTITION/PARTITION_REVOKED/PREPARING_REBALANCE/.../...
> > >> > > .
> > >> > > So, maybe you should explain why these states are necessary. To
> me,
> > >> like
> > >> > > PREPARE_REVOCATION/REVOKING_PARTITION, I don't understand why we
> > need
> > >> 2
> > >> > > states for them? Any reason there?
> > >> > >
> > >> > I think we need to do 3 things when revoking a partition: There are,
> > >> before
> > >> > the revocation, during the revocation, and after the revocation.
> > >> >   1. Before: prepare the revocation, i.e., pausing the data, send
> out
> > >> > commits (as of the current implementation), and send an event to the
> > >> > polling thread to invoke the callback
> > >> >   2. During: Waiting for the callback to be triggered.  Meanwhile,
> the
> > >> > background thread will continue to poll the channel, until the ack
> > event
> > >> > arrives
> > >> >   3. After: Once the invocation ack event is received, we move on to
> > >> > request to join the group.
> > >> >
> > >> > Maybe you are right, perhaps, we don't need the PARTITION_REVOKED
> > state.
> > >> >
> > >> > >
> > >> > > 3. How could we handle the Subscription state change in poll
> thread
> > >> > earlier
> > >> > > than REVOKE_PARTITION/ASSIGN_PARTITION events arrived?
> > >> > > Suppose, background thread sending REVOKE_PARTITION event to poll
> > >> thread,
> > >> > > and before handing it, the consumer updates the subscription.
> > >> > > So in this situation, we'll still invoke revoke callback or not?
> > >> > > This won't happen in current design because they are handing in
> one
> > >> > thread.
> > >> > >
> > >> >
> > >> > Thanks for bringing this to the table. Would it make sense to lock
> the
> > >> > subscriptionState in some scenarios like this? In this case, this
> > >> requires
> > >> > API changes, like throwing an exception to tell the users that the
> > >> > revocation is happening.
> > >> >
> > >> > >
> > >> > > 4. *Is there a better way to configure session interval and
> > heartbeat
> > >> > > interval?*
> > >> > > These configs are moved to broker side in KIP-848 IIRC.
> > >> > > Maybe you can check that KIP and update here.
> > >> > >
> > >> > Thanks.
> > >> >
> > >> > >
> > >> > > Some typos and hard to understand
> > >> > >
> > >> > Hmm, terribly sorry about these typos.  Turned out I didn't publish
> > the
> > >> > updated copy.  Will address them and update the document right away.
> > >> >
> > >> >
> > >> > > 5. Firstly, it _mcomplexfixes_ increasingly difficult ???
> > >> > > 6. it simplifies the _pd_ design ? What is pd?
> > >> > > 7. In "Background thread and its lifecycle" section, I guess the 2
> > >> points
> > >> > > should be 3c and 3d, right?
> > >> > > That is:
> > >> > > a. Check if there is an in-flight event. If not, poll for the new
> > >> events
> > >> > > from the channel.
> > >> > > b. Run the state machine, and here are the following scenario:
> > >> > > c. Poll the networkClient.
> > >> > > d. Backoff for retryBackoffMs milliseconds.
> > >> > >
> > >> > > Thank you.
> > >> > >
> > >> > Again, really grateful for you to review the proposal, so thank you!
> > >> >
> > >> > P
> > >> >
> > >> > > Luke
> > >> > >
> > >> > > On Sat, Sep 17, 2022 at 6:29 AM Philip Nee <philip...@gmail.com>
> > >> wrote:
> > >> > >
> > >> > > > On Fri, Sep 16, 2022 at 3:01 PM Guozhang Wang <
> wangg...@gmail.com
> > >
> > >> > > wrote:
> > >> > > >
> > >> > > > > Hi Philip,
> > >> > > > >
> > >> > > > > On Fri, Sep 16, 2022 at 1:20 PM Philip Nee <
> philip...@gmail.com
> > >
> > >> > > wrote:
> > >> > > > >
> > >> > > > > > Hi Guozhang,
> > >> > > > > >
> > >> > > > > > Thank you for the reviews, and I agree with your
> suggestions:
> > >> If we
> > >> > > > find
> > >> > > > > > any behavior changes, we will issue a KIP. Is it possible to
> > >> first
> > >> > > > > publish
> > >> > > > > > the new implementation in a separate class so that we don't
> > >> disrupt
> > >> > > the
> > >> > > > > > existing implementation? Then we can issue a KIP to address
> > the
> > >> > > > > > regressions.
> > >> > > > > >
> > >> > > > > > Yes! That's what I meant to clarify. Just in case other
> people
> > >> may
> > >> > be
> > >> > > > > confused why we did not propose a KIP right away, but first
> as a
> > >> > > > one-pager.
> > >> > > > >
> > >> > > > >
> > >> > > > > > To your questions:
> > >> > > > > > 1. I think the coordinator discovery will happen on demand,
> > >> i.e.,
> > >> > the
> > >> > > > > > background thread will only try to discover a coordinator
> when
> > >> the
> > >> > > > event
> > >> > > > > > (e.g., commit) requires one. I've noted that, and I'll make
> > sure
> > >> > that
> > >> > > > in
> > >> > > > > > the proposal.
> > >> > > > > >
> > >> > > > > Thanks. Could you then clarify that for the coordinator state
> > >> > > transition?
> > >> > > > > Is that just between `down` and `initialized`, and then
> > >> `initialized`
> > >> > > can
> > >> > > > > then transition back to `down` too?
> > >> > > > >
> > >> > > >
> > >> > > > Will do, I'll update the 1 pager to clarify it.
> > >> > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > > 2. Acked. I think I'll add a section about how each
> operation
> > >> works
> > >> > > in
> > >> > > > > the
> > >> > > > > > new implementation.
> > >> > > > >
> > >> > > > > 3a. I will modify the existing state enum in the new
> > >> implementation.
> > >> > > > > > 3b. Ah, I must have missed this when proofreading the
> > document.
> > >> I
> > >> > > think
> > >> > > > > the
> > >> > > > > > state should be:
> > >> > > > > >   Unjoined -> Commit_async -> Revoking_Partitions ->
> > >> > > Partitions_Revoked
> > >> > > > > ->
> > >> > > > > > Join_Group -> Completed_Rebalancing -> Assinging_Partitions
> ->
> > >> > > > > > Partitions_Assigned -> Stable
> > >> > > > > >   I've made a note of that, and I'll update the document.
> > >> > > > > >
> > >> > > > > Got it, if we are introducing a third state for auto
> committing,
> > >> then
> > >> > > > upon
> > >> > > > > completing the rebalance, we may also need to transit to that
> > >> state
> > >> > > since
> > >> > > > > we may also revoke partitions, right? This is for fixing
> > >> KAFKA-14224
> > >> > > > >
> > >> > > > - Instead of introducing a new state, I think I want to break up
> > the
> > >> > > > PREPARING_REBALANCE into Commit and Revoke.  So I think for your
> > >> > > suggestion
> > >> > > > in 14224, I potentially we could do it this way.
> > >> > > > 1. REVOKING_PARTITION: mute the partition and send the callback
> > >> event
> > >> > to
> > >> > > > the polling thread
> > >> > > > 2. PARTITION_REVOKED: Received the revocation ack.  Advance the
> > >> state
> > >> > to
> > >> > > > COMMIT.
> > >> > > > 3. COMMIT: Commit the offsets.  Wait for the commit to finish,
> > then
> > >> we
> > >> > > can
> > >> > > > start the join group.
> > >> > > >
> > >> > > > Again, thanks, I'll update these changes in the 1pager.
> > >> > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > > 4. Noted. I'll add a section about exception handling.
> > >> > > > > > 5a.  Kirk also had the same comment. I updated the document.
> > >> > > > > > 5b. yes!
> > >> > > > > >
> > >> > > > > > Regarding the POC comments, I think the POC branch is
> actually
> > >> > quite
> > >> > > > > dated.
> > >> > > > > > I need to update it 😅. Do you think we can start with
> simple
> > >> PRs
> > >> > > like
> > >> > > > > > skeletons of ConsumerBackgroundThread.java and EventHandler
> > >> > interface
> > >> > > > and
> > >> > > > > > implementations?
> > >> > > > > >
> > >> > > > > > Yes, I think as long as we are introducing new classes /
> > >> interfaces
> > >> > > the
> > >> > > > > internal package, we can incrementally merge PRs even if they
> > are
> > >> not
> > >> > > > > integrated with the public KafkaConsumer class yet.
> > >> > > > >
> > >> > > > >
> > >> > > > > > 1. I agree the naming can be confusing, but I think the
> usage
> > of
> > >> > > > response
> > >> > > > > > and request has implications for the type of event. As you
> > >> said, BT
> > >> > > ->
> > >> > > > PT
> > >> > > > > > handles error and callback events, and PT -> BT handles
> mostly
> > >> API
> > >> > > > calls.
> > >> > > > > > I'll think again about these naming.
> > >> > > > > > 2. Great suggestions.
> > >> > > > > > 3. I think I'll start with defining a few basic events, like
> > >> > assign,
> > >> > > > > > commits, and fetch. Then we can modify that as we progress
> > with
> > >> the
> > >> > > > > > project.
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > > P
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang <
> > >> wangg...@gmail.com>
> > >> > > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hello Philip,
> > >> > > > > > >
> > >> > > > > > > Thanks for writing down the 1-pager. Just to clarify, the
> > >> reason
> > >> > we
> > >> > > > > wrote
> > >> > > > > > > this as a 1-pager instead of a KIP is that so far all the
> > >> > > > > implementations
> > >> > > > > > > should be internal, and hence not impacting anything on
> the
> > >> > public
> > >> > > > > APIs.
> > >> > > > > > > If, e.g. we found along with the implementation that some
> > >> public
> > >> > > > > > interfaces
> > >> > > > > > > like metrics, need to be modified, then we will send out a
> > >> > separate
> > >> > > > > > thread
> > >> > > > > > > for that in a KIP proposal, right?
> > >> > > > > > >
> > >> > > > > > > I made a pass on the 1-pager and also some of the ongoing
> > >> > > > developments,
> > >> > > > > > > here are just some high-level comments:
> > >> > > > > > >
> > >> > > > > > > On the 1-pager:
> > >> > > > > > >
> > >> > > > > > > 1) I think it's better to clarify the scenarios under
> manual
> > >> > > > assignment
> > >> > > > > > > a.k.a. `consumer.assign()`. E.g. does the background
> thread
> > >> still
> > >> > > > > always
> > >> > > > > > > tries to discover coordinator even if there's no commit
> > >> requests
> > >> > > > (note
> > >> > > > > > > that, if yes, this would be a behavioral change since some
> > >> users
> > >> > > like
> > >> > > > > > > Connect may rely on the consumer to not ever try to
> discover
> > >> the
> > >> > > > > > > coordinator with manual assignment and no commit to
> > brokers)?
> > >> > From
> > >> > > > the
> > >> > > > > > > description it seems if there's no events from the channel
> > to
> > >> > > request
> > >> > > > > > > committing offsets, it would not try to discover
> > coordinator,
> > >> but
> > >> > > > then
> > >> > > > > > the
> > >> > > > > > > background thread's state would would be in `down` and
> > >> > > `initialized`,
> > >> > > > > not
> > >> > > > > > > in `stable`, and hence we should also allow transition
> from
> > >> > > > > `initialized`
> > >> > > > > > > to `down` directly, right?
> > >> > > > > > >
> > >> > > > > > > 2) From the polling thread, besides the `poll` function
> > >> changes,
> > >> > I
> > >> > > > > think
> > >> > > > > > > it's better to also state other blocking function changes
> > like
> > >> > > > > commitSync
> > >> > > > > > > as well. I'd assume e.g. for commitSync it would be
> > >> implemented
> > >> > as:
> > >> > > > > > >
> > >> > > > > > > * Send the commit-request to the server-event channel
> > >> > > > > > > * Continue polling from the consumer event channel, but
> skip
> > >> > other
> > >> > > > > events
> > >> > > > > > > like rebalance-listener (we still need to bookkeep it for
> > the
> > >> > next
> > >> > > > > `poll`
> > >> > > > > > > call, but we just cannot trigger them since that breaks
> > >> > > > compatibility)
> > >> > > > > > > until we received the commit response event.
> > >> > > > > > >
> > >> > > > > > > Some details about how those functions would be
> implemented
> > >> would
> > >> > > > also
> > >> > > > > be
> > >> > > > > > > very helpful for the community's audience.
> > >> > > > > > >
> > >> > > > > > > 3) I have a few questions from the rebalance state machine
> > >> > section:
> > >> > > > > > >
> > >> > > > > > > 3.a). you mentioned in the state machine:
> > >> > > > > > >
> > >> > > > > > > "Wait for the partition revoked event, and advance the
> state
> > >> to
> > >> > > > > > > PARTITIONS_REVOKED"
> > >> > > > > > > "Wait for partition assignment completion from the polling
> > >> > thread.
> > >> > > > > > Advance
> > >> > > > > > > to PARTITIONS_ASSIGNED"
> > >> > > > > > >
> > >> > > > > > > But we do not have those states today, I think you meant
> to
> > >> say
> > >> > > > > > > "PREPARE_REBALANCING" and "STABLE"?
> > >> > > > > > >
> > >> > > > > > > 3.b). Also, it seems that the proposed state transition
> > would
> > >> be
> > >> > > > Stable
> > >> > > > > > ->
> > >> > > > > > > Revoking_Partitions -> Prepare_Rebalancing ->
> > >> > Complete_Rebalancing
> > >> > > ->
> > >> > > > > > > Assigning_Partitions -> Stable (for eager protocol at
> > least),
> > >> but
> > >> > > > when
> > >> > > > > > auto
> > >> > > > > > > commit is enabled, we also need to commit offsets for
> those
> > >> > > revoking
> > >> > > > > > > partitions, and the auto-committing happens before the
> > >> > > > > > > `onPartitionsRevoked` listener is triggered, so should
> > >> > > > auto-committing
> > >> > > > > be
> > >> > > > > > > part of the `Revoking_Partitions` state as well?
> > >> > > > > > >
> > >> > > > > > > 4) Could you expand on the "Exceptions thrown will be
> > >> different."
> > >> > > > > > > description and list all changes to the exceptions (note
> > >> that, if
> > >> > > > they
> > >> > > > > do
> > >> > > > > > > exist, we also need a KIP). For example, besides
> > >> WakeupException,
> > >> > > > Kafka
> > >> > > > > > > also have a InterruptException (different from Java's own
> > >> > > > > > > InterruptedException) defined on the public APIs, are we
> > >> going to
> > >> > > > > change
> > >> > > > > > > which functions would throw and which will not?
> > >> > > > > > >
> > >> > > > > > > 5) In the testing plan:
> > >> > > > > > >
> > >> > > > > > > 5.a) Could you elaborate a bit more on "We need to make
> sure
> > >> the
> > >> > > > timing
> > >> > > > > > of
> > >> > > > > > > the 1.  coordinator discovery and 2.  joinGroup operations
> > are
> > >> > > being
> > >> > > > > done
> > >> > > > > > > in the correct timing."
> > >> > > > > > >
> > >> > > > > > > 5.b) I'd also add that for all the blocking APIs including
> > >> `poll`
> > >> > > > > where a
> > >> > > > > > > timeout value is provided either as param, or implicitly
> > from
> > >> `
> > >> > > > > > > default.api.timeout.ms` they would now be strictly
> > respected
> > >> ---
> > >> > > to
> > >> > > > me
> > >> > > > > > > this
> > >> > > > > > > is also one of the key motivations of this refactoring :)
> > >> > > > > > >
> > >> > > > > > > -------------------------
> > >> > > > > > >
> > >> > > > > > > And the current POC PRs:
> > >> > > > > > >
> > >> > > > > > > 1. Just a quick thought about the naming of
> > >> > > "KafkaServerEventQueue":
> > >> > > > I
> > >> > > > > > > think I also agree with others that it may be confusing to
> > >> > include
> > >> > > > > > > `KafkaServer` on a client class, what about renaming it to
> > >> > > > > > > "ConsumerRequestEventQueue` and
> > `ConsumerResponseEventQueue`?
> > >> I
> > >> > > know
> > >> > > > > that
> > >> > > > > > > it sounds a bit awkward to have the `ResponseEventQueue`
> to
> > >> also
> > >> > > > return
> > >> > > > > > > rebalance-listener-triggering events, but that may be less
> > >> > > confusing.
> > >> > > > > > >
> > >> > > > > > > 2. I'd suggest for new modules like
> > >> `ConsumerBackgroundThread`,
> > >> > we
> > >> > > > > first
> > >> > > > > > > defines an interface in the `internals` package, e.g.
> > >> > > > > > `RequestEventHandler`
> > >> > > > > > > (assuming the previous rename suggestion), and then have a
> > >> > > > > > > `DefaultRequestEventHandler` implementation class which
> > >> > encapsulate
> > >> > > > the
> > >> > > > > > > background thread. This enables us to easily write unit
> > tests
> > >> > that
> > >> > > > > > isolate
> > >> > > > > > > other modules especially with concurrent threadings.
> > >> > > > > > >
> > >> > > > > > > 3. For `KafkaServerEventType`: where would NOOP being
> used?
> > >> Also
> > >> > I
> > >> > > > > think
> > >> > > > > > > there are other types, like FETCH_COMMITTED as well?
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > Thanks,
> > >> > > > > > > Guozhang
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > On Tue, Sep 13, 2022 at 2:14 PM Philip Nee <
> > >> philip...@gmail.com>
> > >> > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Hi all,
> > >> > > > > > > >
> > >> > > > > > > > Here is the proposal to refactor the Kafka Consumer
> > >> > > > > > > > <
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
> > >> > > > > > > > >.
> > >> > > > > > > > The 1-pager is on the wiki, so please take a look at it.
> > >> Also,
> > >> > > > this
> > >> > > > > > is a
> > >> > > > > > > > prerequisite for KIP-848 (the next gen rebalance
> > protocol).
> > >> > > > > > > >
> > >> > > > > > > > Cheers,
> > >> > > > > > > > P
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > --
> > >> > > > > > > -- Guozhang
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > --
> > >> > > > > -- Guozhang
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to