[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kirk True updated KAFKA-15848: ------------------------------ Summary: Consumer API timeout inconsistent between ConsumerDelegate implementations (was: Consumer API timeout inconsistent between LegacyKafkaConsumer and AsyncKafkaConsumer) > Consumer API timeout inconsistent between ConsumerDelegate implementations > -------------------------------------------------------------------------- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests > Reporter: Kirk True > Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > The {{LegacyKafkaConsumer}} and {{AsyncKafkaConsumer}} implementations have a > fundamental difference in their timing when it relates to their use of the > {{Timer}} that is supplied. > h3. tl;dr > For time-bounded tasks, {{LegacyKafkaConsumer}} does the following: > # Attempt the operation > # If successful, return result > # Check timer, return if expired > # Go to top > {{AsyncKafkaConsumer}} effectively does the inverse: > # Check timer, return if expired > # Attempt the operation > # If successful, return result > # Go to top > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via the > {{KafkaConsumerTest}} unit tests, e.g. > {{verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit()}}. This test > invokes {{consumer.poll(Duration.ofMillis(0))}}, i.e. with a zero-length > timeout. > As part of the {{poll()}} logic, the consumer may need to refresh offsets. To > accomplish this, {{LegacyKafkaConsumer}} uses the > {{ConsumerCoordinator.fetchCommittedOffsets()}} method which is structured > like this: > {code:java} > public void fetchCommittedOffsets(Set<TopicPartition> partitions, Timer > timer) { > do { > final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = > sendOffsetFetchRequest(partitions); > . . . > client.poll(future, timer); > . . . > return future.value(); > } while (timer.notExpired()); > return null; > } > {code} > The {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { > Set<TopicPartition> initializingPartitions = > subscriptions.initializingPartitions(); > try { > OffsetFetchApplicationEvent event = new > OffsetFetchApplicationEvent(initializingPartitions); > Map<TopicPartition, OffsetAndMetadata> offsets = > applicationEventHandler.addAndGet(event, timer); > . . . > return true; > } catch (TimeoutException e) { > return false; > } > } > {code} > The call to {{addAndGet}} enqueues the operation on the queue for the network > I/O thread and then _immediately_ invokes {{Future.get()}} with the timeout > to implement a time-bounded blocking call. {{Future.get()}} will be passed > the value of {{0}} (from the above call to {{poll(0)}}, and _immediately_ > throw a {{TimeoutException}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)