lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1831724350
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -111,7 +111,7 @@ public class ConsumerConfig extends AbstractConfig {
* <code>group.protocol</code>
*/
public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
- public static final String DEFAULT_GROUP_PROTOCOL =
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT);
+ public static final String DEFAULT_GROUP_PROTOCOL =
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT);
Review Comment:
we should leave this change of the default consumer in a separate PR, there
is already one for it
https://github.com/apache/kafka/pull/17107
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1574,7 +1583,11 @@ private boolean updateFetchPositions(final Timer timer) {
try {
CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new
CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
- cachedSubscriptionHasAllFetchPositions =
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
+ applicationEventHandler.add(checkAndUpdatePositionsEvent);
+ cachedSubscriptionHasAllFetchPositions = processBackgroundEvents(
Review Comment:
Hey @m1a2st , thanks for the reply. The trick here is that this
`processBackgroundEvents` is kind of a generic communication channel between
the app thread and background thread, that was designed as a way to communicate
from background to app thread when there was no direct triggering api call:
1. heartbeat/coordinator errors received in a response to requests that are
triggered internally in the background (HB and FindCoordinator), not coming
from app thread events.
2. callbacks needed as part of a reconciliation triggered by the broker (not
app thread event)
Using this generic channel for when we do have an event that started in the
app thread seems to bring in complexity and loose ends:
1. Complexity because api calls would need to sort through the generic
queue/channel to find the response to the specific event they are interested
in, processing all background events (callbacks, errors that have no event,
errors that have event), and filter out / re-enqueue...
2. We would leave events lingering uncompleted in the background, that will
never complete really (eventually expired I expect).
On the other hand, we have a “direct” way of communicating the 2 threads:
via events that get completed. So if we hand an app event handy, seems sensible
to just complete it with metadata errors if they happen (just like the classic
achieves, given that on every poll it will throw and bubble up the metadata
exception, effectively failing the triggering call). Ex. `consumer.position`
(the one that started this PR), sends a CheckAndUpdatePositions event to the
background, so I was expecting that we could simply complete it with the
TopicAuthorizationException as a way to fail the consumer call.
Those are the concepts, but I totally hear you about the challenges on the
implementation. Could you elaborate more on that to see if we can sort them out
together maybe? This is a more detailed description of what I have in mind:
1. Catch metadata errors on `ConsumerNetworkThread.runOnce` and
completeExceptionally all the requests it has (requests returned by the
managers poll here
https://github.com/apache/kafka/blob/a0d4cbec402c6c09e601c76a7332747e80e518ca/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L145)
This would propagate the failure to the app thread via the event.
2. Notify managers of the metadata error too (ie. onMetadataERror), in case
they have requests generated that did not make it to point 1, but still need to
fail (ie. Commit request manager may have
unsentOffsetCommitRequests/unsentOffsetFetchRequests. I don't think any other
keeps requests without sending them to the network client but let's double
check)
What do you think? Would it work? Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]