kirktrue commented on code in PR #15455: URL: https://github.com/apache/kafka/pull/15455#discussion_r1511613214
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -937,13 +938,14 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition return Collections.emptyMap(); } + final Timer timer = time.timer(timeout); final FetchCommittedOffsetsEvent event = new FetchCommittedOffsetsEvent( partitions, - timeout.toMillis()); + timer); wakeupTrigger.setActiveTask(event.future()); try { final Map<TopicPartition, OffsetAndMetadata> committedOffsets = applicationEventHandler.addAndGet(event, - time.timer(timeout)); + timer); Review Comment: You're correct on both counts! 😄 > 1. Could you also not pass the timer to `addAndGet()` and use the timer of the event? To me, it seems like the timer passed to the event is always the timer also passed to `addAndGet()`. Indeed it is the same `Timer` object. I intend to resolve that confusion as part of KAFKA-15974 and to keep this one smaller in scope. > 2. Is it correct to use `timer.remainingMs()` when waiting for the future of the event? Do we not risk to timeout the future before we check the timer for expiration? It is not, but the proper fix to change the underlying design is more substantial, so I was saving that change as part of KAFKA-15974. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org