[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kirk True updated KAFKA-15848: ------------------------------ Priority: Blocker (was: Major) > Consumer API timeout inconsistent between ConsumerDelegate implementations > -------------------------------------------------------------------------- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests > Reporter: Kirk True > Assignee: Kirk True > Priority: Blocker > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and > {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their > use and interpretation of the {{Timer}} that is supplied. > h3. tl;dr > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for > success of its operations _before_ checking the timer: > # Submit operation asynchronously > # Wait for operation to complete using {{NetworkClient.poll()}} > # Check for result > ## If successful, return success > ## If fatal failure, return failure > # Check timer > ## If timer expired, return failure > {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: > # Submit operation asynchronously > # Wait for operation to complete using {{Future.get()}} > ## If operation timed out, {{Future.get()}} will throw a timeout error > # Check for result > ## If successful, return success > ## Otherwise, return failure > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via any > of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} > API. Here's a bit of code that illustrates the difference between the two > approaches. > {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a > manner similar to this: > {code:java} > public int getCount(Timer timer) { > do { > final RequestFuture<Integer> future = sendSomeRequest(partitions); > client.poll(future, timer); > if (future.isDone()) > return future.get(); > } while (timer.notExpired()); > return -1; > } > {code} > {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private int getCount(Timer timer) { > try { > CompletableFuture<Integer> future = new CompleteableFuture<>(); > applicationEventQueue.add(future); > return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); > } catch (TimeoutException e) { > return -1; > } > } > {code} > The call to {{add}} enqueues the network operation, but it then _immediately_ > invokes {{Future.get()}} with the timeout to implement a time-bounded > blocking call. Since this method is being called with a timeout of 0, it > _immediately_ throws a {{{}TimeoutException{}}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)