[ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
------------------------------
    Summary: Consumer API timeout inconsistent between ConsumerDelegate 
implementations  (was: Consumer API timeout inconsistent between 
LegacyKafkaConsumer and AsyncKafkaConsumer)

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-15848
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15848
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer, unit tests
>            Reporter: Kirk True
>            Priority: Major
>              Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> The {{LegacyKafkaConsumer}} and {{AsyncKafkaConsumer}} implementations have a 
> fundamental difference in their timing when it relates to their use of the 
> {{Timer}} that is supplied.
> h3. tl;dr
> For time-bounded tasks, {{LegacyKafkaConsumer}} does the following:
> # Attempt the operation
> # If successful, return result
> # Check timer, return if expired
> # Go to top
> {{AsyncKafkaConsumer}} effectively does the inverse:
> # Check timer, return if expired
> # Attempt the operation
> # If successful, return result
> # Go to top
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via the 
> {{KafkaConsumerTest}} unit tests, e.g. 
> {{verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit()}}. This test 
> invokes {{consumer.poll(Duration.ofMillis(0))}}, i.e. with a zero-length 
> timeout.
> As part of the {{poll()}} logic, the consumer may need to refresh offsets. To 
> accomplish this, {{LegacyKafkaConsumer}} uses the 
> {{ConsumerCoordinator.fetchCommittedOffsets()}} method which is structured 
> like this:
> {code:java}
> public void fetchCommittedOffsets(Set<TopicPartition> partitions, Timer 
> timer) {
>     do {
>         final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
> sendOffsetFetchRequest(partitions);
>         . . .
>         client.poll(future, timer);
>         . . .
>         return future.value();
>     } while (timer.notExpired());
>     return null;
> }
> {code}
> The {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
>     Set<TopicPartition> initializingPartitions = 
> subscriptions.initializingPartitions();
>     try {
>         OffsetFetchApplicationEvent event = new 
> OffsetFetchApplicationEvent(initializingPartitions);
>         Map<TopicPartition, OffsetAndMetadata> offsets = 
> applicationEventHandler.addAndGet(event, timer);
>         . . .
>         return true;
>     } catch (TimeoutException e) {
>         return false;
>     }
> }
> {code}
> The call to {{addAndGet}} enqueues the operation on the queue for the network 
> I/O thread and then _immediately_ invokes {{Future.get()}} with the timeout 
> to implement a time-bounded blocking call. {{Future.get()}} will be passed 
> the value of {{0}} (from the above call to {{poll(0)}}, and _immediately_ 
> throw a {{TimeoutException}}.
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to