kirktrue commented on code in PR #15455:
URL: https://github.com/apache/kafka/pull/15455#discussion_r1511613214


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -937,13 +938,14 @@ public Map<TopicPartition, OffsetAndMetadata> 
committed(final Set<TopicPartition
                 return Collections.emptyMap();
             }
 
+            final Timer timer = time.timer(timeout);
             final FetchCommittedOffsetsEvent event = new 
FetchCommittedOffsetsEvent(
                 partitions,
-                timeout.toMillis());
+                timer);
             wakeupTrigger.setActiveTask(event.future());
             try {
                 final Map<TopicPartition, OffsetAndMetadata> committedOffsets 
= applicationEventHandler.addAndGet(event,
-                    time.timer(timeout));
+                    timer);

Review Comment:
   You're correct on both counts! 😄 
   
   > 1. Could you also not pass the timer to `addAndGet()` and use the timer of 
the event? To me, it seems like the timer passed to the event is always the 
timer also passed to `addAndGet()`.
   
   Indeed it is the same `Timer` object. I intend to resolve that confusion as 
part of KAFKA-15974 and to keep this one smaller in scope.
   
   > 2. Is it correct to use `timer.remainingMs()` when waiting for the future 
of the event? Do we not risk to timeout the future before we check the timer 
for expiration?
   
   It is not, but the proper fix to change the underlying design is more 
substantial, so I was saving that change as part of KAFKA-15974.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to