lianetm commented on PR #16241:
URL: https://github.com/apache/kafka/pull/16241#issuecomment-2163819417

   Hey @kirktrue, I don't expect the code snippets you shared in attempt1 would 
work because in the end it creates a new event every time. 
   
   The approach I had in mind is more like what the legacy does by keeping a 
`PendingCommittedOffsetRequest`. We should keep a `pendingFetchEvent` that we 
would reuse on polls as long as the set of partitions are the same. Then we 
simply need to update the `initWithCommittedOffsetsIfNeeded` , instead of :
   
   >             final FetchCommittedOffsetsEvent event =
   >                 new FetchCommittedOffsetsEvent(
   >                     initializingPartitions,
   >                     calculateDeadlineMs(timer));
   >             wakeupTrigger.setActiveTask(event.future());
   >             final Map<TopicPartition, OffsetAndMetadata> offsets = 
applicationEventHandler.addAndGet(event);
   
   We would:
   
   >            // TODO: also consider if the set of initializingPartitions are 
not the same, would need a new request
   >             if (pendingFetchEvent == null) {
   >                 pendingFetchEvent = new FetchCommittedOffsetsEvent(
   >                     initializingPartitions,
   >                     Long.MAX_VALUE);
   >                 wakeupTrigger.setActiveTask(pendingFetchEvent.future());
   >                 applicationEventHandler.add(pendingFetchEvent);
   >             }
   > 
   >             // final Map<TopicPartition, OffsetAndMetadata> offsets = 
applicationEventHandler.addAndGet(event);
   >             final Map<TopicPartition, OffsetAndMetadata> offsets =
   >                 ConsumerUtils.getResult(pendingFetchEvent.future(), timer);
   >             pendingFetchEvent = null;
   
   With that the new integration test passes! This is a rough try I gave at 
this, it still needs the TODO, and I guess we should consider how to expire 
that FetchEvent with max_value timeout that may stay forever in the background. 
   
   With this we would have a really simple approach, very similar to the legacy 
one, which feels safe, but I could be missing details of why it wouldn't work? 
Just sharing in case it helps but happy to review the current PR approach and 
align on it if you think it's best at this point.    
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to