[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842530#comment-17842530
 ] 

Kirk True commented on KAFKA-16637:
-----------------------------------

[~chickenchickenlove]—thanks for filing this. There are two existing 
improvements (KAFKA-15974 and KAFKA-16200) that fix timing issues in the new 
consumer. However, even when testing your case on a temporary branch that 
includes fixes for both of those issues, the problem still showed up.

This issue is related to an optimization for offset fetch logic.

When a user calls {{Consumer.poll()}}, among other things, the consumer 
performs a network request to fetch any previously-committed offsets so it can 
determine from where to start fetching new records. When the user passes in a 
timeout of zero, it's almost always the case that the offset fetch network 
request will not be performed within 0 milliseconds. However, the consumer 
still sends out the request and handles the response when it is received, 
usually a few milliseconds later. In this first attempt, the lookup fails and 
the {{poll()}} loops back around. Given that this timeout is the common case, 
the consumer caches the offset fetch response/result from the first attempt 
(even though it timed out) because it knows that the _next_ call to {{poll()}} 
is going to attempt the exact same operation. When it is later attempted a 
second time, the response is already there from the first attempt such that the 
consumer doesn't need to perform a network request.
 
The existing consumer has implemented this caching in 
[PendingCommittedOffsetRequest|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L132].
 The new consumer has implemented it in 
[CommitRequestManager|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L510].
 The core issue is the new consumer implementation is clearing out the first 
attempt's cached result too aggressively. The effect being that the second (and 
subsequent) attempts fail to find any previous attempt's cached result, and all 
submit network requests, which all fail. Thus the consumer never makes any 
headway.

> KIP-848 does not work well
> --------------------------
>
>                 Key: KAFKA-16637
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16637
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: sanghyeok An
>            Assignee: Kirk True
>            Priority: Minor
>              Labels: kip-848-client-support
>             Fix For: 3.8.0
>
>         Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to