chia7712 commented on code in PR #16310: URL: https://github.com/apache/kafka/pull/16310#discussion_r1640111560
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1666,23 +1668,68 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { return true; log.debug("Refreshing committed offsets for partitions {}", initializingPartitions); + + // The shorter the timeout provided to poll(), the more likely the offsets fetch will time out. To handle + // this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created + // (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the + // case it times out, subsequent attempts will also use the event in order to wait for the results. + if (!canReusePendingOffsetFetchEvent(initializingPartitions)) { + // Give the event a reasonable amount of time to complete. + final long timeoutMs = Math.max(defaultApiTimeoutMs, timer.remainingMs()); + final long deadlineMs = calculateDeadlineMs(time, timeoutMs); + pendingOffsetFetchEvent = new FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs); + applicationEventHandler.add(pendingOffsetFetchEvent); + } + + final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = pendingOffsetFetchEvent.future(); + boolean shouldClearPendingEvent = false; + try { - final FetchCommittedOffsetsEvent event = - new FetchCommittedOffsetsEvent( - initializingPartitions, - calculateDeadlineMs(timer)); - wakeupTrigger.setActiveTask(event.future()); - final Map<TopicPartition, OffsetAndMetadata> offsets = applicationEventHandler.addAndGet(event); + wakeupTrigger.setActiveTask(future); + final Map<TopicPartition, OffsetAndMetadata> offsets = ConsumerUtils.getResult(future, timer); + + // Clear the pending event once its result is successfully retrieved. + shouldClearPendingEvent = true; + refreshCommittedOffsets(offsets, metadata, subscriptions); return true; } catch (TimeoutException e) { log.error("Couldn't refresh committed offsets before timeout expired"); return false; + } catch (InterruptException e) { + throw e; + } catch (Throwable t) { + // Clear the pending event on errors that are not timeout- or interrupt-related. + shouldClearPendingEvent = true; + throw ConsumerUtils.maybeWrapAsKafkaException(t); } finally { + if (shouldClearPendingEvent) + pendingOffsetFetchEvent = null; + wakeupTrigger.clearTask(); } } + /** + * This determines if the {@link #pendingOffsetFetchEvent pending offset fetch event} can be reused. Reuse + * is only possible if all the following conditions are true: + * + * <ul> + * <li>A pending offset fetch event exists</li> + * <li>The partition set of the pending offset fetch event is the same as the given partition set</li> + * <li>The pending offset fetch event has not expired</li> + * </ul> + */ + private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> partitions) { + if (pendingOffsetFetchEvent == null) + return false; + + if (!pendingOffsetFetchEvent.partitions().equals(partitions)) Review Comment: > can I file a new Jira to implement this when we have a little more time to investigate and test? sure and thanks for you sharing. I wasn't even aware that "match the behavior" before :smiling_face_with_tear: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org