Hello Philip,

Thanks for writing down the 1-pager. Just to clarify, the reason we wrote
this as a 1-pager instead of a KIP is that so far all the implementations
should be internal, and hence not impacting anything on the public APIs.
If, e.g. we found along with the implementation that some public interfaces
like metrics, need to be modified, then we will send out a separate thread
for that in a KIP proposal, right?

I made a pass on the 1-pager and also some of the ongoing developments,
here are just some high-level comments:

On the 1-pager:

1) I think it's better to clarify the scenarios under manual assignment
a.k.a. `consumer.assign()`. E.g. does the background thread still always
tries to discover coordinator even if there's no commit requests (note
that, if yes, this would be a behavioral change since some users like
Connect may rely on the consumer to not ever try to discover the
coordinator with manual assignment and no commit to brokers)? From the
description it seems if there's no events from the channel to request
committing offsets, it would not try to discover coordinator, but then the
background thread's state would would be in `down` and `initialized`, not
in `stable`, and hence we should also allow transition from `initialized`
to `down` directly, right?

2) From the polling thread, besides the `poll` function changes, I think
it's better to also state other blocking function changes like commitSync
as well. I'd assume e.g. for commitSync it would be implemented as:

* Send the commit-request to the server-event channel
* Continue polling from the consumer event channel, but skip other events
like rebalance-listener (we still need to bookkeep it for the next `poll`
call, but we just cannot trigger them since that breaks compatibility)
until we received the commit response event.

Some details about how those functions would be implemented would also be
very helpful for the community's audience.

3) I have a few questions from the rebalance state machine section:

3.a). you mentioned in the state machine:

"Wait for the partition revoked event, and advance the state to
PARTITIONS_REVOKED"
"Wait for partition assignment completion from the polling thread.  Advance
to PARTITIONS_ASSIGNED"

But we do not have those states today, I think you meant to say
"PREPARE_REBALANCING" and "STABLE"?

3.b). Also, it seems that the proposed state transition would be Stable ->
Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing ->
Assigning_Partitions -> Stable (for eager protocol at least), but when auto
commit is enabled, we also need to commit offsets for those revoking
partitions, and the auto-committing happens before the
`onPartitionsRevoked` listener is triggered, so should auto-committing be
part of the `Revoking_Partitions` state as well?

4) Could you expand on the "Exceptions thrown will be different."
description and list all changes to the exceptions (note that, if they do
exist, we also need a KIP). For example, besides WakeupException, Kafka
also have a InterruptException (different from Java's own
InterruptedException) defined on the public APIs, are we going to change
which functions would throw and which will not?

5) In the testing plan:

5.a) Could you elaborate a bit more on "We need to make sure the timing of
the 1.  coordinator discovery and 2.  joinGroup operations are being done
in the correct timing."

5.b) I'd also add that for all the blocking APIs including `poll` where a
timeout value is provided either as param, or implicitly from `
default.api.timeout.ms` they would now be strictly respected --- to me this
is also one of the key motivations of this refactoring :)

-------------------------

And the current POC PRs:

1. Just a quick thought about the naming of "KafkaServerEventQueue": I
think I also agree with others that it may be confusing to include
`KafkaServer` on a client class, what about renaming it to
"ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? I know that
it sounds a bit awkward to have the `ResponseEventQueue` to also return
rebalance-listener-triggering events, but that may be less confusing.

2. I'd suggest for new modules like `ConsumerBackgroundThread`, we first
defines an interface in the `internals` package, e.g. `RequestEventHandler`
(assuming the previous rename suggestion), and then have a
`DefaultRequestEventHandler` implementation class which encapsulate the
background thread. This enables us to easily write unit tests that isolate
other modules especially with concurrent threadings.

3. For `KafkaServerEventType`: where would NOOP being used? Also I think
there are other types, like FETCH_COMMITTED as well?



Thanks,
Guozhang


On Tue, Sep 13, 2022 at 2:14 PM Philip Nee <philip...@gmail.com> wrote:

> Hi all,
>
> Here is the proposal to refactor the Kafka Consumer
> <
> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
> >.
> The 1-pager is on the wiki, so please take a look at it.  Also, this is a
> prerequisite for KIP-848 (the next gen rebalance protocol).
>
> Cheers,
> P
>


-- 
-- Guozhang

Reply via email to