[ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
------------------------------
    Priority: Blocker  (was: Major)

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-15848
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15848
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer, system tests
>            Reporter: Kirk True
>            Assignee: Kirk True
>            Priority: Blocker
>              Labels: consumer-threading-refactor, timeout
>             Fix For: 3.8.0
>
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their 
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
> success of its operations _before_ checking the timer:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{NetworkClient.poll()}}
>  # Check for result
>  ## If successful, return success
>  ## If fatal failure, return failure
>  # Check timer
>  ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{Future.get()}}
>  ## If operation timed out, {{Future.get()}} will throw a timeout error
>  # Check for result
>  ## If successful, return success
>  ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any 
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} 
> API. Here's a bit of code that illustrates the difference between the two 
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
> manner similar to this:
> {code:java}
> public int getCount(Timer timer) {
>     do {
>         final RequestFuture<Integer> future = sendSomeRequest(partitions);
>         client.poll(future, timer);
>         if (future.isDone())
>             return future.get();
>     } while (timer.notExpired());
>     return -1;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private int getCount(Timer timer) {
>     try {
>         CompletableFuture<Integer> future = new CompleteableFuture<>();
>         applicationEventQueue.add(future);
>         return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
>     } catch (TimeoutException e) {
>         return -1;
>     }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_ 
> invokes {{Future.get()}} with the timeout to implement a time-bounded 
> blocking call. Since this method is being called with a timeout of 0, it 
> _immediately_ throws a {{{}TimeoutException{}}}. 
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to