Hi all! I've added the "KIP-848" and "ctr" tags to the relevant PRs.
see: https://github.com/apache/kafka/pulls?q=label%3Actr+sort%3Acreated-asc On Tue, May 9, 2023 at 10:29 AM Philip Nee <philip...@gmail.com> wrote: > Hey Divij, > > Firstly, THANK YOU for reaching out and providing the feedback. At this > moment, I don't have all of the answers to your questions at the moment, > but I hope my responses could be helpful in moving the project forward. > > 1. There isn't a prioritization strategy, and the network request buffer > is just FIFO. Basically, all requests are buffered in the network queue and > sent out at the end of the event loop where we poll the client. > Presumably, your concern is that the member can get constantly kicked out > of the group due to not sending out a heartbeat soon enough under high load > or network congestion. I think that's a fair point. On the other hand, we > design the background thread (or we would like the design to be...) to be > lightweight and focus on network communication, such that the client can > quickly go through all the requests without consuming too many resources. > In the case of network congestion, I think there could be other reasons > that cause members to fail, so I don't know if we need the prioritize the > requests. > > 2. Yes, as of now, the fetch offset request is being de-duped, and we > would do that for other requests as well. There's also a RequestState > class that helps request managers track retry backoff (exponential > backoff), which prevents members from spamming brokers. > > 3. There's no implementation of the backpressure for both the application > and background event queue, but here is my thought: > - For ApplicationEventQueue: I can think of a few ways to implement this. > 1. user handles a RetriableError and re-enqueues the event. or 2. Having > the application to buffer the events, but then we are just implementing a > queue on top of the queue. What's your suggestion here? > - For BackgroundEventQueue: The main events are fetches, errors, and > rebalance triggers. For fetching, what's in my mind is some kind of memory > bound mechanism like KIP-81 > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer> > (it > hasn't been implemented). For rebalancing events or errors, there is not > much we could do aside from pausing the request managers. What do you > think? > > 4. See above. > > 5. I'll add an Observability section to the one-pager. My thinking is > similar to yours: 1. memory and CPU pressure and 2. event latency. > > I would love to incorporate some of your past experience from TinkerPop > into this design, so thank you again for sharing your feedback! > > Thank you! > P > > On Tue, May 9, 2023 at 9:50 AM Divij Vaidya <divijvaidy...@gmail.com> > wrote: > >> Hey Philip >> >> Thank you for writing this down. >> >> Overall, the ideal of decoupling the responsibility amongst threads and >> communication using queues looks good. I have a few questions >> associated with handling failure scenarios. >> >> 1. I observed that there is no concept of prioritization in the background >> thread. In case of high load, should a heartbeat handler be prioritized >> over listOffset handler? >> 2. Idempotent requests could be "bulked" together, e.g. if we have two >> list >> offsets in the queue, we should be able to call listOffset once and >> duplicate the response for the requests >> 3. What would the back pressure handling behaviour from the >> applicationEventQueue look like? This is bound to occur during >> intermittent >> network disruption >> 4. Same as 3 but for backgroundEventQueue. If the client thread is doing >> some I/O since the user is blocking it in their business logic, then the >> queue may get full. What happens then? How do we recover and cancel the >> resources such as waiting sync APIs? >> 5. Can we add observability/metrics in the design? As an example, the end >> user might measure latency from the client thread but the request may >> actually be spending time in the queue waiting to be picked by the >> background thread. In such cases, user should have the visibility to >> identify that a need to scale the background thread processing is >> required. >> >> -- >> Divij Vaidya >> >> >> >> On Sun, Sep 18, 2022 at 11:24 PM Philip Nee <philip...@gmail.com> wrote: >> >> > On Sun, Sep 18, 2022 at 6:03 AM Luke Chen <show...@gmail.com> wrote: >> > >> > > Hi Philip, >> > > >> > > Thanks for the write-up. >> > > >> > Also thank you for taking the time to read the proposal. Very grateful. >> > >> > > Some questions: >> > > >> > > 1. You said: If we don't need a coordinator, the background thread >> will >> > > stay in the *initialized* state. >> > > But in the definition of *initialized, *it said: >> > > *initialized*: The background thread completes initialization, and the >> > loop >> > > has started. >> > > Does that mean, even if we don't need a coordinator, we still create a >> > > coordinator and run the loop? And why? >> > > >> > If we don't need a coordinator, I think the background thread should not >> > fire a FindCoordinator request, which is what is implemented today >> right? >> > In terms of object instantiation, I think we can instantiate the object >> on >> > demand. >> > >> > > >> > > 2. Why do we need so many rebalance states? >> > > After this proposal, we'll change the state number from 4 to 8. >> > > And we will have many state changes and state check in the code. Could >> > you >> > > explain why those states are necessary? >> > > I can imagine, like the method: `rebalanceInProgress`, will need to >> check >> > > if the current state in >> > > >> > > >> > >> PREPARE_REVOCATION/REVOKING_PARTITION/PARTITION_REVOKED/PREPARING_REBALANCE/.../... >> > > . >> > > So, maybe you should explain why these states are necessary. To me, >> like >> > > PREPARE_REVOCATION/REVOKING_PARTITION, I don't understand why we need >> 2 >> > > states for them? Any reason there? >> > > >> > I think we need to do 3 things when revoking a partition: There are, >> before >> > the revocation, during the revocation, and after the revocation. >> > 1. Before: prepare the revocation, i.e., pausing the data, send out >> > commits (as of the current implementation), and send an event to the >> > polling thread to invoke the callback >> > 2. During: Waiting for the callback to be triggered. Meanwhile, the >> > background thread will continue to poll the channel, until the ack event >> > arrives >> > 3. After: Once the invocation ack event is received, we move on to >> > request to join the group. >> > >> > Maybe you are right, perhaps, we don't need the PARTITION_REVOKED state. >> > >> > > >> > > 3. How could we handle the Subscription state change in poll thread >> > earlier >> > > than REVOKE_PARTITION/ASSIGN_PARTITION events arrived? >> > > Suppose, background thread sending REVOKE_PARTITION event to poll >> thread, >> > > and before handing it, the consumer updates the subscription. >> > > So in this situation, we'll still invoke revoke callback or not? >> > > This won't happen in current design because they are handing in one >> > thread. >> > > >> > >> > Thanks for bringing this to the table. Would it make sense to lock the >> > subscriptionState in some scenarios like this? In this case, this >> requires >> > API changes, like throwing an exception to tell the users that the >> > revocation is happening. >> > >> > > >> > > 4. *Is there a better way to configure session interval and heartbeat >> > > interval?* >> > > These configs are moved to broker side in KIP-848 IIRC. >> > > Maybe you can check that KIP and update here. >> > > >> > Thanks. >> > >> > > >> > > Some typos and hard to understand >> > > >> > Hmm, terribly sorry about these typos. Turned out I didn't publish the >> > updated copy. Will address them and update the document right away. >> > >> > >> > > 5. Firstly, it _mcomplexfixes_ increasingly difficult ??? >> > > 6. it simplifies the _pd_ design ? What is pd? >> > > 7. In "Background thread and its lifecycle" section, I guess the 2 >> points >> > > should be 3c and 3d, right? >> > > That is: >> > > a. Check if there is an in-flight event. If not, poll for the new >> events >> > > from the channel. >> > > b. Run the state machine, and here are the following scenario: >> > > c. Poll the networkClient. >> > > d. Backoff for retryBackoffMs milliseconds. >> > > >> > > Thank you. >> > > >> > Again, really grateful for you to review the proposal, so thank you! >> > >> > P >> > >> > > Luke >> > > >> > > On Sat, Sep 17, 2022 at 6:29 AM Philip Nee <philip...@gmail.com> >> wrote: >> > > >> > > > On Fri, Sep 16, 2022 at 3:01 PM Guozhang Wang <wangg...@gmail.com> >> > > wrote: >> > > > >> > > > > Hi Philip, >> > > > > >> > > > > On Fri, Sep 16, 2022 at 1:20 PM Philip Nee <philip...@gmail.com> >> > > wrote: >> > > > > >> > > > > > Hi Guozhang, >> > > > > > >> > > > > > Thank you for the reviews, and I agree with your suggestions: >> If we >> > > > find >> > > > > > any behavior changes, we will issue a KIP. Is it possible to >> first >> > > > > publish >> > > > > > the new implementation in a separate class so that we don't >> disrupt >> > > the >> > > > > > existing implementation? Then we can issue a KIP to address the >> > > > > > regressions. >> > > > > > >> > > > > > Yes! That's what I meant to clarify. Just in case other people >> may >> > be >> > > > > confused why we did not propose a KIP right away, but first as a >> > > > one-pager. >> > > > > >> > > > > >> > > > > > To your questions: >> > > > > > 1. I think the coordinator discovery will happen on demand, >> i.e., >> > the >> > > > > > background thread will only try to discover a coordinator when >> the >> > > > event >> > > > > > (e.g., commit) requires one. I've noted that, and I'll make sure >> > that >> > > > in >> > > > > > the proposal. >> > > > > > >> > > > > Thanks. Could you then clarify that for the coordinator state >> > > transition? >> > > > > Is that just between `down` and `initialized`, and then >> `initialized` >> > > can >> > > > > then transition back to `down` too? >> > > > > >> > > > >> > > > Will do, I'll update the 1 pager to clarify it. >> > > > >> > > > > >> > > > > >> > > > > > 2. Acked. I think I'll add a section about how each operation >> works >> > > in >> > > > > the >> > > > > > new implementation. >> > > > > >> > > > > 3a. I will modify the existing state enum in the new >> implementation. >> > > > > > 3b. Ah, I must have missed this when proofreading the document. >> I >> > > think >> > > > > the >> > > > > > state should be: >> > > > > > Unjoined -> Commit_async -> Revoking_Partitions -> >> > > Partitions_Revoked >> > > > > -> >> > > > > > Join_Group -> Completed_Rebalancing -> Assinging_Partitions -> >> > > > > > Partitions_Assigned -> Stable >> > > > > > I've made a note of that, and I'll update the document. >> > > > > > >> > > > > Got it, if we are introducing a third state for auto committing, >> then >> > > > upon >> > > > > completing the rebalance, we may also need to transit to that >> state >> > > since >> > > > > we may also revoke partitions, right? This is for fixing >> KAFKA-14224 >> > > > > >> > > > - Instead of introducing a new state, I think I want to break up the >> > > > PREPARING_REBALANCE into Commit and Revoke. So I think for your >> > > suggestion >> > > > in 14224, I potentially we could do it this way. >> > > > 1. REVOKING_PARTITION: mute the partition and send the callback >> event >> > to >> > > > the polling thread >> > > > 2. PARTITION_REVOKED: Received the revocation ack. Advance the >> state >> > to >> > > > COMMIT. >> > > > 3. COMMIT: Commit the offsets. Wait for the commit to finish, then >> we >> > > can >> > > > start the join group. >> > > > >> > > > Again, thanks, I'll update these changes in the 1pager. >> > > > >> > > > > >> > > > > >> > > > > > 4. Noted. I'll add a section about exception handling. >> > > > > > 5a. Kirk also had the same comment. I updated the document. >> > > > > > 5b. yes! >> > > > > > >> > > > > > Regarding the POC comments, I think the POC branch is actually >> > quite >> > > > > dated. >> > > > > > I need to update it 😅. Do you think we can start with simple >> PRs >> > > like >> > > > > > skeletons of ConsumerBackgroundThread.java and EventHandler >> > interface >> > > > and >> > > > > > implementations? >> > > > > > >> > > > > > Yes, I think as long as we are introducing new classes / >> interfaces >> > > the >> > > > > internal package, we can incrementally merge PRs even if they are >> not >> > > > > integrated with the public KafkaConsumer class yet. >> > > > > >> > > > > >> > > > > > 1. I agree the naming can be confusing, but I think the usage of >> > > > response >> > > > > > and request has implications for the type of event. As you >> said, BT >> > > -> >> > > > PT >> > > > > > handles error and callback events, and PT -> BT handles mostly >> API >> > > > calls. >> > > > > > I'll think again about these naming. >> > > > > > 2. Great suggestions. >> > > > > > 3. I think I'll start with defining a few basic events, like >> > assign, >> > > > > > commits, and fetch. Then we can modify that as we progress with >> the >> > > > > > project. >> > > > > > >> > > > > > Thanks, >> > > > > > P >> > > > > > >> > > > > > >> > > > > > On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang < >> wangg...@gmail.com> >> > > > > wrote: >> > > > > > >> > > > > > > Hello Philip, >> > > > > > > >> > > > > > > Thanks for writing down the 1-pager. Just to clarify, the >> reason >> > we >> > > > > wrote >> > > > > > > this as a 1-pager instead of a KIP is that so far all the >> > > > > implementations >> > > > > > > should be internal, and hence not impacting anything on the >> > public >> > > > > APIs. >> > > > > > > If, e.g. we found along with the implementation that some >> public >> > > > > > interfaces >> > > > > > > like metrics, need to be modified, then we will send out a >> > separate >> > > > > > thread >> > > > > > > for that in a KIP proposal, right? >> > > > > > > >> > > > > > > I made a pass on the 1-pager and also some of the ongoing >> > > > developments, >> > > > > > > here are just some high-level comments: >> > > > > > > >> > > > > > > On the 1-pager: >> > > > > > > >> > > > > > > 1) I think it's better to clarify the scenarios under manual >> > > > assignment >> > > > > > > a.k.a. `consumer.assign()`. E.g. does the background thread >> still >> > > > > always >> > > > > > > tries to discover coordinator even if there's no commit >> requests >> > > > (note >> > > > > > > that, if yes, this would be a behavioral change since some >> users >> > > like >> > > > > > > Connect may rely on the consumer to not ever try to discover >> the >> > > > > > > coordinator with manual assignment and no commit to brokers)? >> > From >> > > > the >> > > > > > > description it seems if there's no events from the channel to >> > > request >> > > > > > > committing offsets, it would not try to discover coordinator, >> but >> > > > then >> > > > > > the >> > > > > > > background thread's state would would be in `down` and >> > > `initialized`, >> > > > > not >> > > > > > > in `stable`, and hence we should also allow transition from >> > > > > `initialized` >> > > > > > > to `down` directly, right? >> > > > > > > >> > > > > > > 2) From the polling thread, besides the `poll` function >> changes, >> > I >> > > > > think >> > > > > > > it's better to also state other blocking function changes like >> > > > > commitSync >> > > > > > > as well. I'd assume e.g. for commitSync it would be >> implemented >> > as: >> > > > > > > >> > > > > > > * Send the commit-request to the server-event channel >> > > > > > > * Continue polling from the consumer event channel, but skip >> > other >> > > > > events >> > > > > > > like rebalance-listener (we still need to bookkeep it for the >> > next >> > > > > `poll` >> > > > > > > call, but we just cannot trigger them since that breaks >> > > > compatibility) >> > > > > > > until we received the commit response event. >> > > > > > > >> > > > > > > Some details about how those functions would be implemented >> would >> > > > also >> > > > > be >> > > > > > > very helpful for the community's audience. >> > > > > > > >> > > > > > > 3) I have a few questions from the rebalance state machine >> > section: >> > > > > > > >> > > > > > > 3.a). you mentioned in the state machine: >> > > > > > > >> > > > > > > "Wait for the partition revoked event, and advance the state >> to >> > > > > > > PARTITIONS_REVOKED" >> > > > > > > "Wait for partition assignment completion from the polling >> > thread. >> > > > > > Advance >> > > > > > > to PARTITIONS_ASSIGNED" >> > > > > > > >> > > > > > > But we do not have those states today, I think you meant to >> say >> > > > > > > "PREPARE_REBALANCING" and "STABLE"? >> > > > > > > >> > > > > > > 3.b). Also, it seems that the proposed state transition would >> be >> > > > Stable >> > > > > > -> >> > > > > > > Revoking_Partitions -> Prepare_Rebalancing -> >> > Complete_Rebalancing >> > > -> >> > > > > > > Assigning_Partitions -> Stable (for eager protocol at least), >> but >> > > > when >> > > > > > auto >> > > > > > > commit is enabled, we also need to commit offsets for those >> > > revoking >> > > > > > > partitions, and the auto-committing happens before the >> > > > > > > `onPartitionsRevoked` listener is triggered, so should >> > > > auto-committing >> > > > > be >> > > > > > > part of the `Revoking_Partitions` state as well? >> > > > > > > >> > > > > > > 4) Could you expand on the "Exceptions thrown will be >> different." >> > > > > > > description and list all changes to the exceptions (note >> that, if >> > > > they >> > > > > do >> > > > > > > exist, we also need a KIP). For example, besides >> WakeupException, >> > > > Kafka >> > > > > > > also have a InterruptException (different from Java's own >> > > > > > > InterruptedException) defined on the public APIs, are we >> going to >> > > > > change >> > > > > > > which functions would throw and which will not? >> > > > > > > >> > > > > > > 5) In the testing plan: >> > > > > > > >> > > > > > > 5.a) Could you elaborate a bit more on "We need to make sure >> the >> > > > timing >> > > > > > of >> > > > > > > the 1. coordinator discovery and 2. joinGroup operations are >> > > being >> > > > > done >> > > > > > > in the correct timing." >> > > > > > > >> > > > > > > 5.b) I'd also add that for all the blocking APIs including >> `poll` >> > > > > where a >> > > > > > > timeout value is provided either as param, or implicitly from >> ` >> > > > > > > default.api.timeout.ms` they would now be strictly respected >> --- >> > > to >> > > > me >> > > > > > > this >> > > > > > > is also one of the key motivations of this refactoring :) >> > > > > > > >> > > > > > > ------------------------- >> > > > > > > >> > > > > > > And the current POC PRs: >> > > > > > > >> > > > > > > 1. Just a quick thought about the naming of >> > > "KafkaServerEventQueue": >> > > > I >> > > > > > > think I also agree with others that it may be confusing to >> > include >> > > > > > > `KafkaServer` on a client class, what about renaming it to >> > > > > > > "ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? >> I >> > > know >> > > > > that >> > > > > > > it sounds a bit awkward to have the `ResponseEventQueue` to >> also >> > > > return >> > > > > > > rebalance-listener-triggering events, but that may be less >> > > confusing. >> > > > > > > >> > > > > > > 2. I'd suggest for new modules like >> `ConsumerBackgroundThread`, >> > we >> > > > > first >> > > > > > > defines an interface in the `internals` package, e.g. >> > > > > > `RequestEventHandler` >> > > > > > > (assuming the previous rename suggestion), and then have a >> > > > > > > `DefaultRequestEventHandler` implementation class which >> > encapsulate >> > > > the >> > > > > > > background thread. This enables us to easily write unit tests >> > that >> > > > > > isolate >> > > > > > > other modules especially with concurrent threadings. >> > > > > > > >> > > > > > > 3. For `KafkaServerEventType`: where would NOOP being used? >> Also >> > I >> > > > > think >> > > > > > > there are other types, like FETCH_COMMITTED as well? >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Thanks, >> > > > > > > Guozhang >> > > > > > > >> > > > > > > >> > > > > > > On Tue, Sep 13, 2022 at 2:14 PM Philip Nee < >> philip...@gmail.com> >> > > > > wrote: >> > > > > > > >> > > > > > > > Hi all, >> > > > > > > > >> > > > > > > > Here is the proposal to refactor the Kafka Consumer >> > > > > > > > < >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor >> > > > > > > > >. >> > > > > > > > The 1-pager is on the wiki, so please take a look at it. >> Also, >> > > > this >> > > > > > is a >> > > > > > > > prerequisite for KIP-848 (the next gen rebalance protocol). >> > > > > > > > >> > > > > > > > Cheers, >> > > > > > > > P >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > -- Guozhang >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > -- Guozhang >> > > > > >> > > > >> > > >> > >> >