kirktrue commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1640101706


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1666,23 +1668,68 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
             return true;
 
         log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
+
+        // The shorter the timeout provided to poll(), the more likely the 
offsets fetch will time out. To handle
+        // this case, on the first attempt to fetch the committed offsets, a 
FetchCommittedOffsetsEvent is created
+        // (with potentially a longer timeout) and stored. The event is used 
for the first attempt, but in the
+        // case it times out, subsequent attempts will also use the event in 
order to wait for the results.
+        if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+            // Give the event a reasonable amount of time to complete.
+            final long timeoutMs = Math.max(defaultApiTimeoutMs, 
timer.remainingMs());
+            final long deadlineMs = calculateDeadlineMs(time, timeoutMs);
+            pendingOffsetFetchEvent = new 
FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs);
+            applicationEventHandler.add(pendingOffsetFetchEvent);
+        }
+
+        final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future 
= pendingOffsetFetchEvent.future();
+        boolean shouldClearPendingEvent = false;
+
         try {
-            final FetchCommittedOffsetsEvent event =
-                new FetchCommittedOffsetsEvent(
-                    initializingPartitions,
-                    calculateDeadlineMs(timer));
-            wakeupTrigger.setActiveTask(event.future());
-            final Map<TopicPartition, OffsetAndMetadata> offsets = 
applicationEventHandler.addAndGet(event);
+            wakeupTrigger.setActiveTask(future);
+            final Map<TopicPartition, OffsetAndMetadata> offsets = 
ConsumerUtils.getResult(future, timer);
+
+            // Clear the pending event once its result is successfully 
retrieved.
+            shouldClearPendingEvent = true;
+
             refreshCommittedOffsets(offsets, metadata, subscriptions);
             return true;
         } catch (TimeoutException e) {
             log.error("Couldn't refresh committed offsets before timeout 
expired");
             return false;
+        } catch (InterruptException e) {
+            throw e;
+        } catch (Throwable t) {
+            // Clear the pending event on errors that are not timeout- or 
interrupt-related.
+            shouldClearPendingEvent = true;
+            throw ConsumerUtils.maybeWrapAsKafkaException(t);
         } finally {
+            if (shouldClearPendingEvent)
+                pendingOffsetFetchEvent = null;
+
             wakeupTrigger.clearTask();
         }
     }
 
+    /**
+     * This determines if the {@link #pendingOffsetFetchEvent pending offset 
fetch event} can be reused. Reuse
+     * is only possible if all the following conditions are true:
+     *
+     * <ul>
+     *     <li>A pending offset fetch event exists</li>
+     *     <li>The partition set of the pending offset fetch event is the same 
as the given partition set</li>
+     *     <li>The pending offset fetch event has not expired</li>
+     * </ul>
+     */
+    private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> 
partitions) {
+        if (pendingOffsetFetchEvent == null)
+            return false;
+
+        if (!pendingOffsetFetchEvent.partitions().equals(partitions))

Review Comment:
   @chia7712—that's definitely an interesting optimization!
   
   IIUC, the suggestion is to relax the requirement to allow reuse if the 
partitions for the _current_ request are a subset of (or equal to) the 
_previous_ request, right? So basically:
   
   ```suggestion
           if (!pendingOffsetFetchEvent.partitions().containsAll(partitions))
   ```
   
   The behavior of the existing `LegacyKafkaConsumer` is to allow reuse only if 
the partitions for the _current_ request equal those of the _previous_ request 
**exactly** 
([source](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L927)).
 That logic is the basis for the behavior used in the `AsyncKafkaConsumer`. 
We've been very deliberate to try to match the behavior between the two 
`Consumer` implementations as closely as possible, unless there's a specific 
reason not to.
   
   It's a small change, and it does makes sense (to me). My main concern is 
that it introduces a subtle difference in behavior between the two `Consumer` 
implementations. Also, the specific case we're trying to solve with this change 
is when the user has passed in a very low timeout and we're in a tight `poll()` 
loop, which suggests the partitions wouldn't be changing between those loops 
(CMIIW).
   
   If I understand correctly, this seems like an optimization, rather than 
something needed for correctness. If that's the case, can I file a new Jira to 
implement this when we have a little more time to investigate and test?
   
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to