Re: KafkaConsumer refactor proposal

2023-07-14 Thread Kirk True
Hi Erik,

Regarding the consumer refactor project, we’re in the process of converting 
Philip’s design to a “proper” KIP here:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-945%3A+Update+threading+model+for+Consumer
 

It’s still very much a draft and not ready for a formal DISCUSS thread, but 
we’d welcome feedback.

That said, the callback issue being discussed here may be better served with a 
dedicated KIP so as not to entangle the fate of one with the other.

Thanks,
Kirk

> On Jul 13, 2023, at 11:44 AM, Erik van Oosten  
> wrote:
> 
> Hi Philip,
> 
> I have been scanning through 
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
>  and KIP-848 and from this I understand that the kafka consumer API will not 
> change.
> 
> Perhaps the refactoring and/or KIP-848 is a good opportunity to improve the 
> API somewhat. In this email I explain why and also give a rough idea what 
> that could look like.
> 
> In the current API, the rebalance listener callback gives the user a chance 
> to commit all work in progress before a partition is actually revoked and 
> assigned to another consumer.
> 
> While the callback is doing all this, the main user thread is not able to 
> process new incoming data. So the rebalance listener affects latency and 
> throughput for non-revoked partitions during a rebalance.
> 
> In addition, I feel that doing a lot of stuff /in/ a callback is always quite 
> awkward. Better only use it to trigger some processing elsewhere.
> 
> Therefore, I would like to propose a new API that does not have these 
> problems and is easy to use (and I hope still easy to implement). In my ideal 
> world, poll is the only method that you need. Lets call it poll2 (to do: come 
> up with a less crappy name). Poll2 returns more than just the polled records, 
> it will also contain newly assigned partitions, partitions that will be 
> revoked during the next call to poll2, partitions that were lost, and perhaps 
> it will even contain the offsets committed so far.
> 
> The most important idea here is that partitions are not revoked immediately, 
> but in the next call to poll2.
> 
> With this API, a user can commit offsets at their own pace during a 
> rebalance. Optionally, for the case that processing of data from the 
> to-be-revoked partition is stil ongoing, we allow the user to postpone the 
> actual revocation in the next poll, so that polling can continue for other 
> partitions.
> 
> Since we are no longer blocking the main user thread, partitions that are not 
> revoked can be processed at full speed.
> 
> Removal of the rebalance listener also makes the API safer; there is no more 
> need for the thread-id check (nor KIP-944) because, concurrent invocations 
> are simply no longer needed. (Of course, if backward compatibility is a goal, 
> not all of these things can be done.)
> 
> Curious to your thoughts and kind regards,
> Erik.
> 
> -- 
> Erik van Oosten
> e.vanoos...@grons.nl
> https://day-to-day-stuff.blogspot.com
> 



Re: KafkaConsumer refactor proposal

2023-07-14 Thread Kirk True
Hi Erik,

Thanks for your interest in the client world :)

I agree that the current implementation of how we handle callbacks is 
problematic in that it introduces race conditions and/or bottlenecks.

I don’t have as much experience with the callbacks from an application 
developer standpoint. Is it critical for the invocation of the callbacks to 
happen within the processing of the commit/rebalance/etc.? If callbacks could 
be redesigned to *not* be in the critical path, would that break user 
applications?

Thanks,
Kirk

> On Jul 13, 2023, at 11:44 AM, Erik van Oosten  
> wrote:
> 
> Hi Philip,
> 
> I have been scanning through 
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
>  and KIP-848 and from this I understand that the kafka consumer API will not 
> change.
> 
> Perhaps the refactoring and/or KIP-848 is a good opportunity to improve the 
> API somewhat. In this email I explain why and also give a rough idea what 
> that could look like.
> 
> In the current API, the rebalance listener callback gives the user a chance 
> to commit all work in progress before a partition is actually revoked and 
> assigned to another consumer.
> 
> While the callback is doing all this, the main user thread is not able to 
> process new incoming data. So the rebalance listener affects latency and 
> throughput for non-revoked partitions during a rebalance.
> 
> In addition, I feel that doing a lot of stuff /in/ a callback is always quite 
> awkward. Better only use it to trigger some processing elsewhere.
> 
> Therefore, I would like to propose a new API that does not have these 
> problems and is easy to use (and I hope still easy to implement). In my ideal 
> world, poll is the only method that you need. Lets call it poll2 (to do: come 
> up with a less crappy name). Poll2 returns more than just the polled records, 
> it will also contain newly assigned partitions, partitions that will be 
> revoked during the next call to poll2, partitions that were lost, and perhaps 
> it will even contain the offsets committed so far.
> 
> The most important idea here is that partitions are not revoked immediately, 
> but in the next call to poll2.
> 
> With this API, a user can commit offsets at their own pace during a 
> rebalance. Optionally, for the case that processing of data from the 
> to-be-revoked partition is stil ongoing, we allow the user to postpone the 
> actual revocation in the next poll, so that polling can continue for other 
> partitions.
> 
> Since we are no longer blocking the main user thread, partitions that are not 
> revoked can be processed at full speed.
> 
> Removal of the rebalance listener also makes the API safer; there is no more 
> need for the thread-id check (nor KIP-944) because, concurrent invocations 
> are simply no longer needed. (Of course, if backward compatibility is a goal, 
> not all of these things can be done.)
> 
> Curious to your thoughts and kind regards,
> Erik.
> 
> -- 
> Erik van Oosten
> e.vanoos...@grons.nl
> https://day-to-day-stuff.blogspot.com
> 



Re: KafkaConsumer refactor proposal

2023-07-13 Thread Erik van Oosten

Hi Philip,

I have been scanning through 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design 
and KIP-848 and from this I understand that the kafka consumer API will 
not change.


Perhaps the refactoring and/or KIP-848 is a good opportunity to improve 
the API somewhat. In this email I explain why and also give a rough idea 
what that could look like.


In the current API, the rebalance listener callback gives the user a 
chance to commit all work in progress before a partition is actually 
revoked and assigned to another consumer.


While the callback is doing all this, the main user thread is not able 
to process new incoming data. So the rebalance listener affects latency 
and throughput for non-revoked partitions during a rebalance.


In addition, I feel that doing a lot of stuff /in/ a callback is always 
quite awkward. Better only use it to trigger some processing elsewhere.


Therefore, I would like to propose a new API that does not have these 
problems and is easy to use (and I hope still easy to implement). In my 
ideal world, poll is the only method that you need. Lets call it poll2 
(to do: come up with a less crappy name). Poll2 returns more than just 
the polled records, it will also contain newly assigned partitions, 
partitions that will be revoked during the next call to poll2, 
partitions that were lost, and perhaps it will even contain the offsets 
committed so far.


The most important idea here is that partitions are not revoked 
immediately, but in the next call to poll2.


With this API, a user can commit offsets at their own pace during a 
rebalance. Optionally, for the case that processing of data from the 
to-be-revoked partition is stil ongoing, we allow the user to postpone 
the actual revocation in the next poll, so that polling can continue for 
other partitions.


Since we are no longer blocking the main user thread, partitions that 
are not revoked can be processed at full speed.


Removal of the rebalance listener also makes the API safer; there is no 
more need for the thread-id check (nor KIP-944) because, concurrent 
invocations are simply no longer needed. (Of course, if backward 
compatibility is a goal, not all of these things can be done.)


Curious to your thoughts and kind regards,
    Erik.

--
Erik van Oosten
e.vanoos...@grons.nl
https://day-to-day-stuff.blogspot.com



Re: KafkaConsumer refactor proposal

2023-07-10 Thread Philip Nee
Hey Divij,

Sorry for the late reply - I was away for some extended period of time, but
I'm back to address some of the questions here.

To your point, we will incorporate your suggestions to the 1-pager, and
well as the future KIP-945 (we decided to create a kip for this work for
better publicity).

Here are some of my feedback for your suggestions, and I will respond to
each of the bullet points above:
1. For the current design, the background thread and the main thread handle
different types of tasks, and the background thread should continue making
the system calls even if the user thread is under high load.  However,
currently, we did not implement prioritization because all calls should be
sent out by the end of each event loop.  Since the background thread is
meant to be fairly lightweight by only handling the in/out-bound of the
network requests, I believe your concern should have been addressed.
2. The background thread will poll until the heartbeat or the metadata
update timer is expired or if there are any new events coming in.  In this
case, the background would not be busy waiting for the next event loop
to happen.
3. Similar to #1, the application thread works independently of the
background thread, so there shouldn't be any concern about blocking
behavior.
4. We implemented batching for a few API calls but not all.  We will
implement this optimization after having a working client!
5. The network util should check the timer of the unsent request before
sending them out.  The request will be removed if the timer expires.
6. The network connections are reused, I believe.

7. The queues aren't used for system calls.  The system calls are issued by
the "Request Manager" at the top of the event loop, and sent out by the end
of the event loop.  As long as the background thread is running, the
heartbeat and metadata updates should continue to happen.  The
clarification: we only use the queues for communication purpose: The
application event queue sends APIs calls to the background thread, and the
background event queue allows the background thread to notify the user
about errors and events like callback invocation.
8. Noted.  KAFKA-15173  was
added per your suggestion.

9. Noted: KAFKA-15173  was
added per your suggestion.
10. Noted. We will spend time examining this item.
11. Noted.
12. Noted.

13. Similar to #7: I think this is the optimization we need to investigate
after delivering the client.  KAFKA-15175 was created for this.

Thanks!
P


On Wed, May 24, 2023 at 7:20 AM Divij Vaidya 
wrote:

> Thank you Philip for the response. I will post more comments about this in
> the upcoming days but meanwhile here are my initial thoughts. Please let me
> know what you think about them.
>
> Addition to terminology
> Let's divide the API calls from the consumer into two categories. System
> API calls and User API calls.
>
> System APIs are the ones that are required for ensuring that
> consumers remain connected to the correct broker. Examples of system API
> calls would be heartbeat, metadata etc.
> User APIs are the ones which are invoked on-demand by the user. Examples of
> user API calls would be poll, commit etc.
>
> Now, may I suggest adding the following set of requirements/principles
> which the design should uphold:
> 1. System API calls should work seamlessly even when the consumer is under
> high load from user API calls.
> 2. Threads should consume minimal CPU when not doing any work. i.e. threads
> should not be performing busy waits (infinite loops without any block)
> while idle.
> 3. It should not be possible to block the background thread by the
> application/user.
> 4. Multiple occurrences of idempotent API requests should be bulked
> together. Note that this is different from batch. In bulking, we execute
> one request and use its response as the response to other duplicates /
> similar requests.
> 5. Requests which have exceeded their timeout should not be sent to the
> broker.
> 6. Design should minimize the number of connections created to the broker.
> This means that both system and user API requests should use the same
> network connection. It also means that, in this design, we are not solving
> the problem of network layer starvation of system requests due to too many
> user requests.
>
> Based on the above requirements, I would suggest the following changes:
> 7. We should consider adding another requestQueue (similar to
> ApplicationEvent queue) which will contain only the system events. The
> background thread will first dequeue from this system queue and afterwards
> go to the user event queue (ApplicationEvent queue). A daemon thread for
> each system event will regularly add events to the system queue at a fixed
> periodic interval. As a future optimization, we will piggy-back heartbeat
> with other requests such that any request from the client to the broker
> will be 

Re: KafkaConsumer refactor proposal

2023-05-24 Thread Divij Vaidya
Thank you Philip for the response. I will post more comments about this in
the upcoming days but meanwhile here are my initial thoughts. Please let me
know what you think about them.

Addition to terminology
Let's divide the API calls from the consumer into two categories. System
API calls and User API calls.

System APIs are the ones that are required for ensuring that
consumers remain connected to the correct broker. Examples of system API
calls would be heartbeat, metadata etc.
User APIs are the ones which are invoked on-demand by the user. Examples of
user API calls would be poll, commit etc.

Now, may I suggest adding the following set of requirements/principles
which the design should uphold:
1. System API calls should work seamlessly even when the consumer is under
high load from user API calls.
2. Threads should consume minimal CPU when not doing any work. i.e. threads
should not be performing busy waits (infinite loops without any block)
while idle.
3. It should not be possible to block the background thread by the
application/user.
4. Multiple occurrences of idempotent API requests should be bulked
together. Note that this is different from batch. In bulking, we execute
one request and use its response as the response to other duplicates /
similar requests.
5. Requests which have exceeded their timeout should not be sent to the
broker.
6. Design should minimize the number of connections created to the broker.
This means that both system and user API requests should use the same
network connection. It also means that, in this design, we are not solving
the problem of network layer starvation of system requests due to too many
user requests.

Based on the above requirements, I would suggest the following changes:
7. We should consider adding another requestQueue (similar to
ApplicationEvent queue) which will contain only the system events. The
background thread will first dequeue from this system queue and afterwards
go to the user event queue (ApplicationEvent queue). A daemon thread for
each system event will regularly add events to the system queue at a fixed
periodic interval. As a future optimization, we will piggy-back heartbeat
with other requests such that any request from the client to the broker
will be considered as a heartbeat. This solves the prioritization and noisy
neighbour problem for us amongst the system and user APIs.
8. We should add assertions in the callbacks to ensure that the correct
thread is executing them. As an example, I have found the pattern used by
netty very useful [1].

Handling of failure scenarios
The document is missing explanation for handling of various failure
scenarios. For example, we should document the behaviour for the following:
9. When the ApplicationEvent queue gets full. Currently, it is an unbounded
queue and will fail with an OutOfMemory exception. We may want to bound it
with a user configurable value. Bounding will help us fail gracefully
instead of crashing with OOM. The graceful failure in this case should be
in the form of a re-triable exception to the user with an instruction to
adopt an exponential backoff and retry.
10. When the background thread gets interrupted / killed for some reason,
we should be able to detect it and close the consumer gracefully.
11. We should document how the tear down of the consumer is handled. The
unsent requests sitting in the queue will probably be discarded and the
in-flights futures completed. exceptionally. We should let the user know
(via a log) the amount of requests that are being discarded.
12. We should document how the timeout for APIs which are time bound are
handled. As an example, if the requests are still in the queue / unsentList
when timeout hits, do we even want to send it across to the server?

Future improvements:
13. Lastly, as a future improvement, we should probably look at using Java
NIO2's asynchronous socket channel APIs (instead of the current
implementation of blocking network IO in Kafka). It would greatly improve
the background thread performance, since unlike the broker, this is the
single thread doing most of the work on the consumer.

Once we have reached a conclusion on the above points, perhaps, we can
create JIRAs to address them and work with the community members to resolve
them.

[1]
https://github.com/netty/netty/blob/57d536c59e434e88c0d8374d6b03671edb02885f/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java#L65



--
Divij Vaidya



On Tue, May 23, 2023 at 6:57 PM Philip Nee  wrote:

> Hi all!
>
> I've added the "KIP-848" and "ctr" tags to the relevant PRs.
>
> see:
> https://github.com/apache/kafka/pulls?q=label%3Actr+sort%3Acreated-asc
>
> On Tue, May 9, 2023 at 10:29 AM Philip Nee  wrote:
>
> > Hey Divij,
> >
> > Firstly, THANK YOU for reaching out and providing the feedback.  At this
> > moment, I don't have all of the answers to your questions at the moment,
> > but I hope my responses could be helpful in moving the project forward.
> >
> > 1. 

Re: KafkaConsumer refactor proposal

2023-05-23 Thread Philip Nee
Hi all!

I've added the "KIP-848" and "ctr" tags to the relevant PRs.

see: https://github.com/apache/kafka/pulls?q=label%3Actr+sort%3Acreated-asc

On Tue, May 9, 2023 at 10:29 AM Philip Nee  wrote:

> Hey Divij,
>
> Firstly, THANK YOU for reaching out and providing the feedback.  At this
> moment, I don't have all of the answers to your questions at the moment,
> but I hope my responses could be helpful in moving the project forward.
>
> 1. There isn't a prioritization strategy, and the network request buffer
> is just FIFO. Basically, all requests are buffered in the network queue and
> sent out at the end of the event loop where we poll the client.
> Presumably, your concern is that the member can get constantly kicked out
> of the group due to not sending out a heartbeat soon enough under high load
> or network congestion.  I think that's a fair point.  On the other hand, we
> design the background thread (or we would like the design to be...) to be
> lightweight and focus on network communication, such that the client can
> quickly go through all the requests without consuming too many resources.
> In the case of network congestion, I think there could be other reasons
> that cause members to fail, so I don't know if we need the prioritize the
> requests.
>
> 2. Yes, as of now, the fetch offset request is being de-duped, and we
> would do that for other requests as well.  There's also a RequestState
> class that helps request managers track retry backoff (exponential
> backoff), which prevents members from spamming brokers.
>
> 3. There's no implementation of the backpressure for both the application
> and background event queue, but here is my thought:
>  - For ApplicationEventQueue: I can think of a few ways to implement this.
> 1. user handles a RetriableError and re-enqueues the event. or 2. Having
> the application to buffer the events, but then we are just implementing a
> queue on top of the queue.  What's your suggestion here?
>  - For BackgroundEventQueue: The main events are fetches, errors, and
> rebalance triggers.  For fetching, what's in my mind is some kind of memory
> bound mechanism like KIP-81
> 
>  (it
> hasn't been implemented).  For rebalancing events or errors, there is not
> much we could do aside from pausing the request managers.  What do you
> think?
>
> 4. See above.
>
> 5. I'll add an Observability section to the one-pager. My thinking is
> similar to yours: 1. memory and CPU pressure and 2. event latency.
>
> I would love to incorporate some of your past experience from TinkerPop
> into this design, so thank you again for sharing your feedback!
>
> Thank you!
> P
>
> On Tue, May 9, 2023 at 9:50 AM Divij Vaidya 
> wrote:
>
>> Hey Philip
>>
>> Thank you for writing this down.
>>
>> Overall, the ideal of decoupling the responsibility amongst threads and
>> communication using queues looks good. I have a few questions
>> associated with handling failure scenarios.
>>
>> 1. I observed that there is no concept of prioritization in the background
>> thread. In case of high load, should a heartbeat handler be prioritized
>> over listOffset handler?
>> 2. Idempotent requests could be "bulked" together, e.g. if we have two
>> list
>> offsets in the queue, we should be able to call listOffset once and
>> duplicate the response for the requests
>> 3. What would the back pressure handling behaviour from the
>> applicationEventQueue look like? This is bound to occur during
>> intermittent
>> network disruption
>> 4. Same as 3 but for backgroundEventQueue. If the client thread is doing
>> some I/O since the user is blocking it in their business logic, then the
>> queue may get full. What happens then? How do we recover and cancel the
>> resources such as waiting sync APIs?
>> 5. Can we add observability/metrics in the design? As an example, the end
>> user might measure latency from the client thread but the request may
>> actually be spending time in the queue waiting to be picked by the
>> background thread. In such cases, user should have the visibility to
>> identify that a need to scale the background thread processing is
>> required.
>>
>> --
>> Divij Vaidya
>>
>>
>>
>> On Sun, Sep 18, 2022 at 11:24 PM Philip Nee  wrote:
>>
>> > On Sun, Sep 18, 2022 at 6:03 AM Luke Chen  wrote:
>> >
>> > > Hi Philip,
>> > >
>> > > Thanks for the write-up.
>> > >
>> > Also thank you for taking the time to read the proposal.  Very grateful.
>> >
>> > > Some questions:
>> > >
>> > > 1. You said: If we don't need a coordinator, the background thread
>> will
>> > > stay in the *initialized* state.
>> > > But in the definition of *initialized, *it said:
>> > > *initialized*: The background thread completes initialization, and the
>> > loop
>> > > has started.
>> > > Does that mean, even if we don't need a coordinator, we still create a
>> > > coordinator and run the loop? And why?
>> > >
>> > If we 

Re: KafkaConsumer refactor proposal

2023-05-09 Thread Philip Nee
Hey Divij,

Firstly, THANK YOU for reaching out and providing the feedback.  At this
moment, I don't have all of the answers to your questions at the moment,
but I hope my responses could be helpful in moving the project forward.

1. There isn't a prioritization strategy, and the network request buffer is
just FIFO. Basically, all requests are buffered in the network queue and
sent out at the end of the event loop where we poll the client.
Presumably, your concern is that the member can get constantly kicked out
of the group due to not sending out a heartbeat soon enough under high load
or network congestion.  I think that's a fair point.  On the other hand, we
design the background thread (or we would like the design to be...) to be
lightweight and focus on network communication, such that the client can
quickly go through all the requests without consuming too many resources.
In the case of network congestion, I think there could be other reasons
that cause members to fail, so I don't know if we need the prioritize the
requests.

2. Yes, as of now, the fetch offset request is being de-duped, and we would
do that for other requests as well.  There's also a RequestState class that
helps request managers track retry backoff (exponential backoff), which
prevents members from spamming brokers.

3. There's no implementation of the backpressure for both the application
and background event queue, but here is my thought:
 - For ApplicationEventQueue: I can think of a few ways to implement this.
1. user handles a RetriableError and re-enqueues the event. or 2. Having
the application to buffer the events, but then we are just implementing a
queue on top of the queue.  What's your suggestion here?
 - For BackgroundEventQueue: The main events are fetches, errors, and
rebalance triggers.  For fetching, what's in my mind is some kind of memory
bound mechanism like KIP-81

(it
hasn't been implemented).  For rebalancing events or errors, there is not
much we could do aside from pausing the request managers.  What do you
think?

4. See above.

5. I'll add an Observability section to the one-pager. My thinking is
similar to yours: 1. memory and CPU pressure and 2. event latency.

I would love to incorporate some of your past experience from TinkerPop
into this design, so thank you again for sharing your feedback!

Thank you!
P

On Tue, May 9, 2023 at 9:50 AM Divij Vaidya  wrote:

> Hey Philip
>
> Thank you for writing this down.
>
> Overall, the ideal of decoupling the responsibility amongst threads and
> communication using queues looks good. I have a few questions
> associated with handling failure scenarios.
>
> 1. I observed that there is no concept of prioritization in the background
> thread. In case of high load, should a heartbeat handler be prioritized
> over listOffset handler?
> 2. Idempotent requests could be "bulked" together, e.g. if we have two list
> offsets in the queue, we should be able to call listOffset once and
> duplicate the response for the requests
> 3. What would the back pressure handling behaviour from the
> applicationEventQueue look like? This is bound to occur during intermittent
> network disruption
> 4. Same as 3 but for backgroundEventQueue. If the client thread is doing
> some I/O since the user is blocking it in their business logic, then the
> queue may get full. What happens then? How do we recover and cancel the
> resources such as waiting sync APIs?
> 5. Can we add observability/metrics in the design? As an example, the end
> user might measure latency from the client thread but the request may
> actually be spending time in the queue waiting to be picked by the
> background thread. In such cases, user should have the visibility to
> identify that a need to scale the background thread processing is required.
>
> --
> Divij Vaidya
>
>
>
> On Sun, Sep 18, 2022 at 11:24 PM Philip Nee  wrote:
>
> > On Sun, Sep 18, 2022 at 6:03 AM Luke Chen  wrote:
> >
> > > Hi Philip,
> > >
> > > Thanks for the write-up.
> > >
> > Also thank you for taking the time to read the proposal.  Very grateful.
> >
> > > Some questions:
> > >
> > > 1. You said: If we don't need a coordinator, the background thread will
> > > stay in the *initialized* state.
> > > But in the definition of *initialized, *it said:
> > > *initialized*: The background thread completes initialization, and the
> > loop
> > > has started.
> > > Does that mean, even if we don't need a coordinator, we still create a
> > > coordinator and run the loop? And why?
> > >
> > If we don't need a coordinator, I think the background thread should not
> > fire a FindCoordinator request, which is what is implemented today right?
> > In terms of object instantiation, I think we can instantiate the object
> on
> > demand.
> >
> > >
> > > 2. Why do we need so many rebalance states?
> > > After this proposal, we'll change the state number from 4 to 8.

Re: KafkaConsumer refactor proposal

2023-05-09 Thread Divij Vaidya
Hey Philip

Thank you for writing this down.

Overall, the ideal of decoupling the responsibility amongst threads and
communication using queues looks good. I have a few questions
associated with handling failure scenarios.

1. I observed that there is no concept of prioritization in the background
thread. In case of high load, should a heartbeat handler be prioritized
over listOffset handler?
2. Idempotent requests could be "bulked" together, e.g. if we have two list
offsets in the queue, we should be able to call listOffset once and
duplicate the response for the requests
3. What would the back pressure handling behaviour from the
applicationEventQueue look like? This is bound to occur during intermittent
network disruption
4. Same as 3 but for backgroundEventQueue. If the client thread is doing
some I/O since the user is blocking it in their business logic, then the
queue may get full. What happens then? How do we recover and cancel the
resources such as waiting sync APIs?
5. Can we add observability/metrics in the design? As an example, the end
user might measure latency from the client thread but the request may
actually be spending time in the queue waiting to be picked by the
background thread. In such cases, user should have the visibility to
identify that a need to scale the background thread processing is required.

--
Divij Vaidya



On Sun, Sep 18, 2022 at 11:24 PM Philip Nee  wrote:

> On Sun, Sep 18, 2022 at 6:03 AM Luke Chen  wrote:
>
> > Hi Philip,
> >
> > Thanks for the write-up.
> >
> Also thank you for taking the time to read the proposal.  Very grateful.
>
> > Some questions:
> >
> > 1. You said: If we don't need a coordinator, the background thread will
> > stay in the *initialized* state.
> > But in the definition of *initialized, *it said:
> > *initialized*: The background thread completes initialization, and the
> loop
> > has started.
> > Does that mean, even if we don't need a coordinator, we still create a
> > coordinator and run the loop? And why?
> >
> If we don't need a coordinator, I think the background thread should not
> fire a FindCoordinator request, which is what is implemented today right?
> In terms of object instantiation, I think we can instantiate the object on
> demand.
>
> >
> > 2. Why do we need so many rebalance states?
> > After this proposal, we'll change the state number from 4 to 8.
> > And we will have many state changes and state check in the code. Could
> you
> > explain why those states are necessary?
> > I can imagine, like the method: `rebalanceInProgress`, will need to check
> > if the current state in
> >
> >
> PREPARE_REVOCATION/REVOKING_PARTITION/PARTITION_REVOKED/PREPARING_REBALANCE/.../...
> > .
> > So, maybe you should explain why these states are necessary. To me, like
> > PREPARE_REVOCATION/REVOKING_PARTITION, I don't understand why we need 2
> > states for them? Any reason there?
> >
> I think we need to do 3 things when revoking a partition: There are, before
> the revocation, during the revocation, and after the revocation.
>   1. Before: prepare the revocation, i.e., pausing the data, send out
> commits (as of the current implementation), and send an event to the
> polling thread to invoke the callback
>   2. During: Waiting for the callback to be triggered.  Meanwhile, the
> background thread will continue to poll the channel, until the ack event
> arrives
>   3. After: Once the invocation ack event is received, we move on to
> request to join the group.
>
> Maybe you are right, perhaps, we don't need the PARTITION_REVOKED state.
>
> >
> > 3. How could we handle the Subscription state change in poll thread
> earlier
> > than REVOKE_PARTITION/ASSIGN_PARTITION events arrived?
> > Suppose, background thread sending REVOKE_PARTITION event to poll thread,
> > and before handing it, the consumer updates the subscription.
> > So in this situation, we'll still invoke revoke callback or not?
> > This won't happen in current design because they are handing in one
> thread.
> >
>
> Thanks for bringing this to the table. Would it make sense to lock the
> subscriptionState in some scenarios like this? In this case, this requires
> API changes, like throwing an exception to tell the users that the
> revocation is happening.
>
> >
> > 4. *Is there a better way to configure session interval and heartbeat
> > interval?*
> > These configs are moved to broker side in KIP-848 IIRC.
> > Maybe you can check that KIP and update here.
> >
> Thanks.
>
> >
> > Some typos and hard to understand
> >
> Hmm, terribly sorry about these typos.  Turned out I didn't publish the
> updated copy.  Will address them and update the document right away.
>
>
> > 5. Firstly, it _mcomplexfixes_ increasingly difficult ???
> > 6. it simplifies the _pd_ design ? What is pd?
> > 7. In "Background thread and its lifecycle" section, I guess the 2 points
> > should be 3c and 3d, right?
> > That is:
> > a. Check if there is an in-flight event. If not, poll for the new events
> > from 

Re: KafkaConsumer refactor proposal

2022-09-18 Thread Philip Nee
On Sun, Sep 18, 2022 at 6:03 AM Luke Chen  wrote:

> Hi Philip,
>
> Thanks for the write-up.
>
Also thank you for taking the time to read the proposal.  Very grateful.

> Some questions:
>
> 1. You said: If we don't need a coordinator, the background thread will
> stay in the *initialized* state.
> But in the definition of *initialized, *it said:
> *initialized*: The background thread completes initialization, and the loop
> has started.
> Does that mean, even if we don't need a coordinator, we still create a
> coordinator and run the loop? And why?
>
If we don't need a coordinator, I think the background thread should not
fire a FindCoordinator request, which is what is implemented today right?
In terms of object instantiation, I think we can instantiate the object on
demand.

>
> 2. Why do we need so many rebalance states?
> After this proposal, we'll change the state number from 4 to 8.
> And we will have many state changes and state check in the code. Could you
> explain why those states are necessary?
> I can imagine, like the method: `rebalanceInProgress`, will need to check
> if the current state in
>
> PREPARE_REVOCATION/REVOKING_PARTITION/PARTITION_REVOKED/PREPARING_REBALANCE/.../...
> .
> So, maybe you should explain why these states are necessary. To me, like
> PREPARE_REVOCATION/REVOKING_PARTITION, I don't understand why we need 2
> states for them? Any reason there?
>
I think we need to do 3 things when revoking a partition: There are, before
the revocation, during the revocation, and after the revocation.
  1. Before: prepare the revocation, i.e., pausing the data, send out
commits (as of the current implementation), and send an event to the
polling thread to invoke the callback
  2. During: Waiting for the callback to be triggered.  Meanwhile, the
background thread will continue to poll the channel, until the ack event
arrives
  3. After: Once the invocation ack event is received, we move on to
request to join the group.

Maybe you are right, perhaps, we don't need the PARTITION_REVOKED state.

>
> 3. How could we handle the Subscription state change in poll thread earlier
> than REVOKE_PARTITION/ASSIGN_PARTITION events arrived?
> Suppose, background thread sending REVOKE_PARTITION event to poll thread,
> and before handing it, the consumer updates the subscription.
> So in this situation, we'll still invoke revoke callback or not?
> This won't happen in current design because they are handing in one thread.
>

Thanks for bringing this to the table. Would it make sense to lock the
subscriptionState in some scenarios like this? In this case, this requires
API changes, like throwing an exception to tell the users that the
revocation is happening.

>
> 4. *Is there a better way to configure session interval and heartbeat
> interval?*
> These configs are moved to broker side in KIP-848 IIRC.
> Maybe you can check that KIP and update here.
>
Thanks.

>
> Some typos and hard to understand
>
Hmm, terribly sorry about these typos.  Turned out I didn't publish the
updated copy.  Will address them and update the document right away.


> 5. Firstly, it _mcomplexfixes_ increasingly difficult ???
> 6. it simplifies the _pd_ design ? What is pd?
> 7. In "Background thread and its lifecycle" section, I guess the 2 points
> should be 3c and 3d, right?
> That is:
> a. Check if there is an in-flight event. If not, poll for the new events
> from the channel.
> b. Run the state machine, and here are the following scenario:
> c. Poll the networkClient.
> d. Backoff for retryBackoffMs milliseconds.
>
> Thank you.
>
Again, really grateful for you to review the proposal, so thank you!

P

> Luke
>
> On Sat, Sep 17, 2022 at 6:29 AM Philip Nee  wrote:
>
> > On Fri, Sep 16, 2022 at 3:01 PM Guozhang Wang 
> wrote:
> >
> > > Hi Philip,
> > >
> > > On Fri, Sep 16, 2022 at 1:20 PM Philip Nee 
> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Thank you for the reviews, and I agree with your suggestions: If we
> > find
> > > > any behavior changes, we will issue a KIP. Is it possible to first
> > > publish
> > > > the new implementation in a separate class so that we don't disrupt
> the
> > > > existing implementation? Then we can issue a KIP to address the
> > > > regressions.
> > > >
> > > > Yes! That's what I meant to clarify. Just in case other people may be
> > > confused why we did not propose a KIP right away, but first as a
> > one-pager.
> > >
> > >
> > > > To your questions:
> > > > 1. I think the coordinator discovery will happen on demand, i.e., the
> > > > background thread will only try to discover a coordinator when the
> > event
> > > > (e.g., commit) requires one. I've noted that, and I'll make sure that
> > in
> > > > the proposal.
> > > >
> > > Thanks. Could you then clarify that for the coordinator state
> transition?
> > > Is that just between `down` and `initialized`, and then `initialized`
> can
> > > then transition back to `down` too?
> > >
> >
> > Will do, I'll update the 1 pager to clarify it.

Re: KafkaConsumer refactor proposal

2022-09-18 Thread Luke Chen
Hi Philip,

Thanks for the write-up.
Some questions:

1. You said: If we don't need a coordinator, the background thread will
stay in the *initialized* state.
But in the definition of *initialized, *it said:
*initialized*: The background thread completes initialization, and the loop
has started.
Does that mean, even if we don't need a coordinator, we still create a
coordinator and run the loop? And why?

2. Why do we need so many rebalance states?
After this proposal, we'll change the state number from 4 to 8.
And we will have many state changes and state check in the code. Could you
explain why those states are necessary?
I can imagine, like the method: `rebalanceInProgress`, will need to check
if the current state in
PREPARE_REVOCATION/REVOKING_PARTITION/PARTITION_REVOKED/PREPARING_REBALANCE/.../...
.
So, maybe you should explain why these states are necessary. To me, like
PREPARE_REVOCATION/REVOKING_PARTITION, I don't understand why we need 2
states for them? Any reason there?

3. How could we handle the Subscription state change in poll thread earlier
than REVOKE_PARTITION/ASSIGN_PARTITION events arrived?
Suppose, background thread sending REVOKE_PARTITION event to poll thread,
and before handing it, the consumer updates the subscription.
So in this situation, we'll still invoke revoke callback or not?
This won't happen in current design because they are handing in one thread.

4. *Is there a better way to configure session interval and heartbeat
interval?*
These configs are moved to broker side in KIP-848 IIRC.
Maybe you can check that KIP and update here.

Some typos and hard to understand
5. Firstly, it _mcomplexfixes_ increasingly difficult ???
6. it simplifies the _pd_ design ? What is pd?
7. In "Background thread and its lifecycle" section, I guess the 2 points
should be 3c and 3d, right?
That is:
a. Check if there is an in-flight event. If not, poll for the new events
from the channel.
b. Run the state machine, and here are the following scenario:
c. Poll the networkClient.
d. Backoff for retryBackoffMs milliseconds.

Thank you.
Luke

On Sat, Sep 17, 2022 at 6:29 AM Philip Nee  wrote:

> On Fri, Sep 16, 2022 at 3:01 PM Guozhang Wang  wrote:
>
> > Hi Philip,
> >
> > On Fri, Sep 16, 2022 at 1:20 PM Philip Nee  wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thank you for the reviews, and I agree with your suggestions: If we
> find
> > > any behavior changes, we will issue a KIP. Is it possible to first
> > publish
> > > the new implementation in a separate class so that we don't disrupt the
> > > existing implementation? Then we can issue a KIP to address the
> > > regressions.
> > >
> > > Yes! That's what I meant to clarify. Just in case other people may be
> > confused why we did not propose a KIP right away, but first as a
> one-pager.
> >
> >
> > > To your questions:
> > > 1. I think the coordinator discovery will happen on demand, i.e., the
> > > background thread will only try to discover a coordinator when the
> event
> > > (e.g., commit) requires one. I've noted that, and I'll make sure that
> in
> > > the proposal.
> > >
> > Thanks. Could you then clarify that for the coordinator state transition?
> > Is that just between `down` and `initialized`, and then `initialized` can
> > then transition back to `down` too?
> >
>
> Will do, I'll update the 1 pager to clarify it.
>
> >
> >
> > > 2. Acked. I think I'll add a section about how each operation works in
> > the
> > > new implementation.
> >
> > 3a. I will modify the existing state enum in the new implementation.
> > > 3b. Ah, I must have missed this when proofreading the document. I think
> > the
> > > state should be:
> > >   Unjoined -> Commit_async -> Revoking_Partitions -> Partitions_Revoked
> > ->
> > > Join_Group -> Completed_Rebalancing -> Assinging_Partitions ->
> > > Partitions_Assigned -> Stable
> > >   I've made a note of that, and I'll update the document.
> > >
> > Got it, if we are introducing a third state for auto committing, then
> upon
> > completing the rebalance, we may also need to transit to that state since
> > we may also revoke partitions, right? This is for fixing KAFKA-14224
> >
> - Instead of introducing a new state, I think I want to break up the
> PREPARING_REBALANCE into Commit and Revoke.  So I think for your suggestion
> in 14224, I potentially we could do it this way.
> 1. REVOKING_PARTITION: mute the partition and send the callback event to
> the polling thread
> 2. PARTITION_REVOKED: Received the revocation ack.  Advance the state to
> COMMIT.
> 3. COMMIT: Commit the offsets.  Wait for the commit to finish, then we can
> start the join group.
>
> Again, thanks, I'll update these changes in the 1pager.
>
> >
> >
> > > 4. Noted. I'll add a section about exception handling.
> > > 5a.  Kirk also had the same comment. I updated the document.
> > > 5b. yes!
> > >
> > > Regarding the POC comments, I think the POC branch is actually quite
> > dated.
> > > I need to update it . Do you think we can start with simple 

Re: KafkaConsumer refactor proposal

2022-09-16 Thread Philip Nee
On Fri, Sep 16, 2022 at 3:01 PM Guozhang Wang  wrote:

> Hi Philip,
>
> On Fri, Sep 16, 2022 at 1:20 PM Philip Nee  wrote:
>
> > Hi Guozhang,
> >
> > Thank you for the reviews, and I agree with your suggestions: If we find
> > any behavior changes, we will issue a KIP. Is it possible to first
> publish
> > the new implementation in a separate class so that we don't disrupt the
> > existing implementation? Then we can issue a KIP to address the
> > regressions.
> >
> > Yes! That's what I meant to clarify. Just in case other people may be
> confused why we did not propose a KIP right away, but first as a one-pager.
>
>
> > To your questions:
> > 1. I think the coordinator discovery will happen on demand, i.e., the
> > background thread will only try to discover a coordinator when the event
> > (e.g., commit) requires one. I've noted that, and I'll make sure that in
> > the proposal.
> >
> Thanks. Could you then clarify that for the coordinator state transition?
> Is that just between `down` and `initialized`, and then `initialized` can
> then transition back to `down` too?
>

Will do, I'll update the 1 pager to clarify it.

>
>
> > 2. Acked. I think I'll add a section about how each operation works in
> the
> > new implementation.
>
> 3a. I will modify the existing state enum in the new implementation.
> > 3b. Ah, I must have missed this when proofreading the document. I think
> the
> > state should be:
> >   Unjoined -> Commit_async -> Revoking_Partitions -> Partitions_Revoked
> ->
> > Join_Group -> Completed_Rebalancing -> Assinging_Partitions ->
> > Partitions_Assigned -> Stable
> >   I've made a note of that, and I'll update the document.
> >
> Got it, if we are introducing a third state for auto committing, then upon
> completing the rebalance, we may also need to transit to that state since
> we may also revoke partitions, right? This is for fixing KAFKA-14224
>
- Instead of introducing a new state, I think I want to break up the
PREPARING_REBALANCE into Commit and Revoke.  So I think for your suggestion
in 14224, I potentially we could do it this way.
1. REVOKING_PARTITION: mute the partition and send the callback event to
the polling thread
2. PARTITION_REVOKED: Received the revocation ack.  Advance the state to
COMMIT.
3. COMMIT: Commit the offsets.  Wait for the commit to finish, then we can
start the join group.

Again, thanks, I'll update these changes in the 1pager.

>
>
> > 4. Noted. I'll add a section about exception handling.
> > 5a.  Kirk also had the same comment. I updated the document.
> > 5b. yes!
> >
> > Regarding the POC comments, I think the POC branch is actually quite
> dated.
> > I need to update it . Do you think we can start with simple PRs like
> > skeletons of ConsumerBackgroundThread.java and EventHandler interface and
> > implementations?
> >
> > Yes, I think as long as we are introducing new classes / interfaces the
> internal package, we can incrementally merge PRs even if they are not
> integrated with the public KafkaConsumer class yet.
>
>
> > 1. I agree the naming can be confusing, but I think the usage of response
> > and request has implications for the type of event. As you said, BT -> PT
> > handles error and callback events, and PT -> BT handles mostly API calls.
> > I'll think again about these naming.
> > 2. Great suggestions.
> > 3. I think I'll start with defining a few basic events, like assign,
> > commits, and fetch. Then we can modify that as we progress with the
> > project.
> >
> > Thanks,
> > P
> >
> >
> > On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang 
> wrote:
> >
> > > Hello Philip,
> > >
> > > Thanks for writing down the 1-pager. Just to clarify, the reason we
> wrote
> > > this as a 1-pager instead of a KIP is that so far all the
> implementations
> > > should be internal, and hence not impacting anything on the public
> APIs.
> > > If, e.g. we found along with the implementation that some public
> > interfaces
> > > like metrics, need to be modified, then we will send out a separate
> > thread
> > > for that in a KIP proposal, right?
> > >
> > > I made a pass on the 1-pager and also some of the ongoing developments,
> > > here are just some high-level comments:
> > >
> > > On the 1-pager:
> > >
> > > 1) I think it's better to clarify the scenarios under manual assignment
> > > a.k.a. `consumer.assign()`. E.g. does the background thread still
> always
> > > tries to discover coordinator even if there's no commit requests (note
> > > that, if yes, this would be a behavioral change since some users like
> > > Connect may rely on the consumer to not ever try to discover the
> > > coordinator with manual assignment and no commit to brokers)? From the
> > > description it seems if there's no events from the channel to request
> > > committing offsets, it would not try to discover coordinator, but then
> > the
> > > background thread's state would would be in `down` and `initialized`,
> not
> > > in `stable`, and hence we should also allow transition 

Re: KafkaConsumer refactor proposal

2022-09-16 Thread Guozhang Wang
Hi Philip,

On Fri, Sep 16, 2022 at 1:20 PM Philip Nee  wrote:

> Hi Guozhang,
>
> Thank you for the reviews, and I agree with your suggestions: If we find
> any behavior changes, we will issue a KIP. Is it possible to first publish
> the new implementation in a separate class so that we don't disrupt the
> existing implementation? Then we can issue a KIP to address the
> regressions.
>
> Yes! That's what I meant to clarify. Just in case other people may be
confused why we did not propose a KIP right away, but first as a one-pager.


> To your questions:
> 1. I think the coordinator discovery will happen on demand, i.e., the
> background thread will only try to discover a coordinator when the event
> (e.g., commit) requires one. I've noted that, and I'll make sure that in
> the proposal.
>
Thanks. Could you then clarify that for the coordinator state transition?
Is that just between `down` and `initialized`, and then `initialized` can
then transition back to `down` too?


> 2. Acked. I think I'll add a section about how each operation works in the
> new implementation.

3a. I will modify the existing state enum in the new implementation.
> 3b. Ah, I must have missed this when proofreading the document. I think the
> state should be:
>   Unjoined -> Commit_async -> Revoking_Partitions -> Partitions_Revoked ->
> Join_Group -> Completed_Rebalancing -> Assinging_Partitions ->
> Partitions_Assigned -> Stable
>   I've made a note of that, and I'll update the document.
>
Got it, if we are introducing a third state for auto committing, then upon
completing the rebalance, we may also need to transit to that state since
we may also revoke partitions, right? This is for fixing KAFKA-14224


> 4. Noted. I'll add a section about exception handling.
> 5a.  Kirk also had the same comment. I updated the document.
> 5b. yes!
>
> Regarding the POC comments, I think the POC branch is actually quite dated.
> I need to update it . Do you think we can start with simple PRs like
> skeletons of ConsumerBackgroundThread.java and EventHandler interface and
> implementations?
>
> Yes, I think as long as we are introducing new classes / interfaces the
internal package, we can incrementally merge PRs even if they are not
integrated with the public KafkaConsumer class yet.


> 1. I agree the naming can be confusing, but I think the usage of response
> and request has implications for the type of event. As you said, BT -> PT
> handles error and callback events, and PT -> BT handles mostly API calls.
> I'll think again about these naming.
> 2. Great suggestions.
> 3. I think I'll start with defining a few basic events, like assign,
> commits, and fetch. Then we can modify that as we progress with the
> project.
>
> Thanks,
> P
>
>
> On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang  wrote:
>
> > Hello Philip,
> >
> > Thanks for writing down the 1-pager. Just to clarify, the reason we wrote
> > this as a 1-pager instead of a KIP is that so far all the implementations
> > should be internal, and hence not impacting anything on the public APIs.
> > If, e.g. we found along with the implementation that some public
> interfaces
> > like metrics, need to be modified, then we will send out a separate
> thread
> > for that in a KIP proposal, right?
> >
> > I made a pass on the 1-pager and also some of the ongoing developments,
> > here are just some high-level comments:
> >
> > On the 1-pager:
> >
> > 1) I think it's better to clarify the scenarios under manual assignment
> > a.k.a. `consumer.assign()`. E.g. does the background thread still always
> > tries to discover coordinator even if there's no commit requests (note
> > that, if yes, this would be a behavioral change since some users like
> > Connect may rely on the consumer to not ever try to discover the
> > coordinator with manual assignment and no commit to brokers)? From the
> > description it seems if there's no events from the channel to request
> > committing offsets, it would not try to discover coordinator, but then
> the
> > background thread's state would would be in `down` and `initialized`, not
> > in `stable`, and hence we should also allow transition from `initialized`
> > to `down` directly, right?
> >
> > 2) From the polling thread, besides the `poll` function changes, I think
> > it's better to also state other blocking function changes like commitSync
> > as well. I'd assume e.g. for commitSync it would be implemented as:
> >
> > * Send the commit-request to the server-event channel
> > * Continue polling from the consumer event channel, but skip other events
> > like rebalance-listener (we still need to bookkeep it for the next `poll`
> > call, but we just cannot trigger them since that breaks compatibility)
> > until we received the commit response event.
> >
> > Some details about how those functions would be implemented would also be
> > very helpful for the community's audience.
> >
> > 3) I have a few questions from the rebalance state machine section:
> >
> > 3.a). 

Re: KafkaConsumer refactor proposal

2022-09-16 Thread Philip Nee
Hi Guozhang,

Thank you for the reviews, and I agree with your suggestions: If we find
any behavior changes, we will issue a KIP. Is it possible to first publish
the new implementation in a separate class so that we don't disrupt the
existing implementation? Then we can issue a KIP to address the regressions.

To your questions:
1. I think the coordinator discovery will happen on demand, i.e., the
background thread will only try to discover a coordinator when the event
(e.g., commit) requires one. I've noted that, and I'll make sure that in
the proposal.
2. Acked. I think I'll add a section about how each operation works in the
new implementation.
3a. I will modify the existing state enum in the new implementation.
3b. Ah, I must have missed this when proofreading the document. I think the
state should be:
  Unjoined -> Commit_async -> Revoking_Partitions -> Partitions_Revoked ->
Join_Group -> Completed_Rebalancing -> Assinging_Partitions ->
Partitions_Assigned -> Stable
  I've made a note of that, and I'll update the document.
4. Noted. I'll add a section about exception handling.
5a.  Kirk also had the same comment. I updated the document.
5b. yes!

Regarding the POC comments, I think the POC branch is actually quite dated.
I need to update it . Do you think we can start with simple PRs like
skeletons of ConsumerBackgroundThread.java and EventHandler interface and
implementations?

1. I agree the naming can be confusing, but I think the usage of response
and request has implications for the type of event. As you said, BT -> PT
handles error and callback events, and PT -> BT handles mostly API calls.
I'll think again about these naming.
2. Great suggestions.
3. I think I'll start with defining a few basic events, like assign,
commits, and fetch. Then we can modify that as we progress with the project.

Thanks,
P


On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang  wrote:

> Hello Philip,
>
> Thanks for writing down the 1-pager. Just to clarify, the reason we wrote
> this as a 1-pager instead of a KIP is that so far all the implementations
> should be internal, and hence not impacting anything on the public APIs.
> If, e.g. we found along with the implementation that some public interfaces
> like metrics, need to be modified, then we will send out a separate thread
> for that in a KIP proposal, right?
>
> I made a pass on the 1-pager and also some of the ongoing developments,
> here are just some high-level comments:
>
> On the 1-pager:
>
> 1) I think it's better to clarify the scenarios under manual assignment
> a.k.a. `consumer.assign()`. E.g. does the background thread still always
> tries to discover coordinator even if there's no commit requests (note
> that, if yes, this would be a behavioral change since some users like
> Connect may rely on the consumer to not ever try to discover the
> coordinator with manual assignment and no commit to brokers)? From the
> description it seems if there's no events from the channel to request
> committing offsets, it would not try to discover coordinator, but then the
> background thread's state would would be in `down` and `initialized`, not
> in `stable`, and hence we should also allow transition from `initialized`
> to `down` directly, right?
>
> 2) From the polling thread, besides the `poll` function changes, I think
> it's better to also state other blocking function changes like commitSync
> as well. I'd assume e.g. for commitSync it would be implemented as:
>
> * Send the commit-request to the server-event channel
> * Continue polling from the consumer event channel, but skip other events
> like rebalance-listener (we still need to bookkeep it for the next `poll`
> call, but we just cannot trigger them since that breaks compatibility)
> until we received the commit response event.
>
> Some details about how those functions would be implemented would also be
> very helpful for the community's audience.
>
> 3) I have a few questions from the rebalance state machine section:
>
> 3.a). you mentioned in the state machine:
>
> "Wait for the partition revoked event, and advance the state to
> PARTITIONS_REVOKED"
> "Wait for partition assignment completion from the polling thread.  Advance
> to PARTITIONS_ASSIGNED"
>
> But we do not have those states today, I think you meant to say
> "PREPARE_REBALANCING" and "STABLE"?
>
> 3.b). Also, it seems that the proposed state transition would be Stable ->
> Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing ->
> Assigning_Partitions -> Stable (for eager protocol at least), but when auto
> commit is enabled, we also need to commit offsets for those revoking
> partitions, and the auto-committing happens before the
> `onPartitionsRevoked` listener is triggered, so should auto-committing be
> part of the `Revoking_Partitions` state as well?
>
> 4) Could you expand on the "Exceptions thrown will be different."
> description and list all changes to the exceptions (note that, if they do
> exist, we also need a KIP). 

Re: KafkaConsumer refactor proposal

2022-09-16 Thread Philip Nee
Thank you, Kirk!  Your feedback is very much appreciated; I'd like to try
to address them in the email and the document.

Here are my responses:
1.  I removed that point because I think we should still try to maintain
the current behavior to avoid regressions
2.  I think I misworded it.  I think I'm referring to the coordinator
code.  The example of the sync commit now relies on the polling thread
to wait for the future to be completed instead of the background thread.
I've made a note of that, and I'll make sure to clarify that section.
3.  All issues with that Jira tag should be addressed by this refactoring.
4.  Yes, it will be thread-safe.  So locking mechanism will be added
there.  Noted.
5.  Noted.
6.  Thanks for bringing this up.  I think I need to add a section on how to
release these changes to AK.

Thank you!
P

On Thu, Sep 15, 2022 at 2:38 PM Kirk True  wrote:

> Hi Philip!
>
> Thanks for the write-up.
>
> On Tue, Sep 13, 2022, at 2:13 PM, Philip Nee wrote:
> > Hi all,
> >
> > Here is the proposal to refactor the Kafka Consumer
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
> >.
> > The 1-pager is on the wiki, so please take a look at it.  Also, this is a
> > prerequisite for KIP-848 (the next gen rebalance protocol).
>
> I only have time for a quick read-through, but here are some initial
> questions/comments:
>
>  1. The third bullet point in the "Public-Facing Changes" section says
> that the "exception[s] thrown will be different." Can you provide some more
> context on that? Will this affect user applications that attempt to handle
> exceptions?
>  2. Under "Scope" it mentions that the proposal is to "remove some
> blocking methods, such as commitOffsetSync." Are you referring to the
> Consumer.commitSync() method or something else?
>  3. I like how the proposal will address the systemic issues with the
> current consumer (
> https://issues.apache.org/jira/issues/?jql=labels%20%3D%20new-consumer-threading-should-fix).
> Is there a specific set of those Jiras that will be fixed/resolved, or is
> it 'best effort'?
>  4. "A note on the *subscriptionState*: Its reference will be shared by
> polling and background threads." Sharing this reference implies locking of
> some sort, yes?
>  5. Can you elaborate on this sentence: "We need to make sure the timing
> of the 1.  coordinator discovery and 2.  joinGroup operations are being
> done in the correct timing."?
>  6. Does this new implementation for the consumer internals live alongside
> the current implementation in the code base? How does a user opt-in to the
> "new" implementation?
>
> >
> > Cheers,
> > P
> >
>
> Thanks!
> Kirk


Re: KafkaConsumer refactor proposal

2022-09-15 Thread Guozhang Wang
Hello Philip,

Thanks for writing down the 1-pager. Just to clarify, the reason we wrote
this as a 1-pager instead of a KIP is that so far all the implementations
should be internal, and hence not impacting anything on the public APIs.
If, e.g. we found along with the implementation that some public interfaces
like metrics, need to be modified, then we will send out a separate thread
for that in a KIP proposal, right?

I made a pass on the 1-pager and also some of the ongoing developments,
here are just some high-level comments:

On the 1-pager:

1) I think it's better to clarify the scenarios under manual assignment
a.k.a. `consumer.assign()`. E.g. does the background thread still always
tries to discover coordinator even if there's no commit requests (note
that, if yes, this would be a behavioral change since some users like
Connect may rely on the consumer to not ever try to discover the
coordinator with manual assignment and no commit to brokers)? From the
description it seems if there's no events from the channel to request
committing offsets, it would not try to discover coordinator, but then the
background thread's state would would be in `down` and `initialized`, not
in `stable`, and hence we should also allow transition from `initialized`
to `down` directly, right?

2) From the polling thread, besides the `poll` function changes, I think
it's better to also state other blocking function changes like commitSync
as well. I'd assume e.g. for commitSync it would be implemented as:

* Send the commit-request to the server-event channel
* Continue polling from the consumer event channel, but skip other events
like rebalance-listener (we still need to bookkeep it for the next `poll`
call, but we just cannot trigger them since that breaks compatibility)
until we received the commit response event.

Some details about how those functions would be implemented would also be
very helpful for the community's audience.

3) I have a few questions from the rebalance state machine section:

3.a). you mentioned in the state machine:

"Wait for the partition revoked event, and advance the state to
PARTITIONS_REVOKED"
"Wait for partition assignment completion from the polling thread.  Advance
to PARTITIONS_ASSIGNED"

But we do not have those states today, I think you meant to say
"PREPARE_REBALANCING" and "STABLE"?

3.b). Also, it seems that the proposed state transition would be Stable ->
Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing ->
Assigning_Partitions -> Stable (for eager protocol at least), but when auto
commit is enabled, we also need to commit offsets for those revoking
partitions, and the auto-committing happens before the
`onPartitionsRevoked` listener is triggered, so should auto-committing be
part of the `Revoking_Partitions` state as well?

4) Could you expand on the "Exceptions thrown will be different."
description and list all changes to the exceptions (note that, if they do
exist, we also need a KIP). For example, besides WakeupException, Kafka
also have a InterruptException (different from Java's own
InterruptedException) defined on the public APIs, are we going to change
which functions would throw and which will not?

5) In the testing plan:

5.a) Could you elaborate a bit more on "We need to make sure the timing of
the 1.  coordinator discovery and 2.  joinGroup operations are being done
in the correct timing."

5.b) I'd also add that for all the blocking APIs including `poll` where a
timeout value is provided either as param, or implicitly from `
default.api.timeout.ms` they would now be strictly respected --- to me this
is also one of the key motivations of this refactoring :)

-

And the current POC PRs:

1. Just a quick thought about the naming of "KafkaServerEventQueue": I
think I also agree with others that it may be confusing to include
`KafkaServer` on a client class, what about renaming it to
"ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? I know that
it sounds a bit awkward to have the `ResponseEventQueue` to also return
rebalance-listener-triggering events, but that may be less confusing.

2. I'd suggest for new modules like `ConsumerBackgroundThread`, we first
defines an interface in the `internals` package, e.g. `RequestEventHandler`
(assuming the previous rename suggestion), and then have a
`DefaultRequestEventHandler` implementation class which encapsulate the
background thread. This enables us to easily write unit tests that isolate
other modules especially with concurrent threadings.

3. For `KafkaServerEventType`: where would NOOP being used? Also I think
there are other types, like FETCH_COMMITTED as well?



Thanks,
Guozhang


On Tue, Sep 13, 2022 at 2:14 PM Philip Nee  wrote:

> Hi all,
>
> Here is the proposal to refactor the Kafka Consumer
> <
> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
> >.
> The 1-pager is on the wiki, so please take a look at it.  Also, 

Re: KafkaConsumer refactor proposal

2022-09-15 Thread Kirk True
Hi Philip!

Thanks for the write-up.

On Tue, Sep 13, 2022, at 2:13 PM, Philip Nee wrote:
> Hi all,
> 
> Here is the proposal to refactor the Kafka Consumer
> .
> The 1-pager is on the wiki, so please take a look at it.  Also, this is a
> prerequisite for KIP-848 (the next gen rebalance protocol).

I only have time for a quick read-through, but here are some initial 
questions/comments:

 1. The third bullet point in the "Public-Facing Changes" section says that the 
"exception[s] thrown will be different." Can you provide some more context on 
that? Will this affect user applications that attempt to handle exceptions?
 2. Under "Scope" it mentions that the proposal is to "remove some blocking 
methods, such as commitOffsetSync." Are you referring to the 
Consumer.commitSync() method or something else?
 3. I like how the proposal will address the systemic issues with the current 
consumer 
(https://issues.apache.org/jira/issues/?jql=labels%20%3D%20new-consumer-threading-should-fix).
 Is there a specific set of those Jiras that will be fixed/resolved, or is it 
'best effort'?
 4. "A note on the *subscriptionState*: Its reference will be shared by polling 
and background threads." Sharing this reference implies locking of some sort, 
yes?
 5. Can you elaborate on this sentence: "We need to make sure the timing of the 
1.  coordinator discovery and 2.  joinGroup operations are being done in the 
correct timing."?
 6. Does this new implementation for the consumer internals live alongside the 
current implementation in the code base? How does a user opt-in to the "new" 
implementation?

> 
> Cheers,
> P
> 

Thanks!
Kirk