kirktrue commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1725990958
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -52,15 +59,27 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
private final ConsumerMetadata metadata;
private final SubscriptionState subscriptions;
private final RequestManagers requestManagers;
+ private final Time time;
+
+ /**
+ * OffsetFetch request triggered to update fetch positions. The request is
kept. It will be
+ * cleared every time a response with the committed offsets is received
and used to update
+ * fetch positions. If the response cannot be used because the
UpdateFetchPositions expired,
+ * it will be kept to be used on the next attempt to update fetch
positions if partitions
+ * remain the same.
+ */
+ private FetchCommittedOffsetsEvent pendingOffsetFetchEvent;
Review Comment:
Option 3 makes the most sense. I don't think we need to be afraid of making
a dedicated class for this, if needed.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
}
}
- private void process(final ResetPositionsEvent event) {
- CompletableFuture<Void> future =
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
- future.whenComplete(complete(event.future()));
+ /**
+ *
+ * Fetch committed offsets and use them to update positions in the
subscription state. If no
+ * committed offsets available, fetch offsets from the leader.
+ */
+ private void process(final UpdateFetchPositionsEvent
updateFetchPositionsEvent) {
+ try {
+ // The event could be completed in the app thread before it got to
be
+ // processed in the background (ex. interrupted)
+ if (updateFetchPositionsEvent.future().isCompletedExceptionally())
{
+ log.debug("UpdateFetchPositions event {} was completed
exceptionally before it " +
+ "got time to be processed.", updateFetchPositionsEvent);
+ return;
+ }
+
+ // Validate positions using the partition leader end offsets, to
detect if any partition
+ // has been truncated due to a leader change. This will trigger an
OffsetForLeaderEpoch
+ // request, retrieve the partition end offsets, and validate the
current position
+ // against it. It will throw an exception if log truncation is
detected.
+ requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
Review Comment:
@lianetm—thanks for the explanation. We don't want to require the position
validation to finish before performing the rest of the logic.
Out of curiosity, regarding "storming the broker with requests," does the
`OffsetRequestManager` already handle duplicate concurrent requests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]