lianetm commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1727373275
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
}
}
- private void process(final ResetPositionsEvent event) {
- CompletableFuture<Void> future =
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
- future.whenComplete(complete(event.future()));
+ /**
+ *
+ * Fetch committed offsets and use them to update positions in the
subscription state. If no
+ * committed offsets available, fetch offsets from the leader.
+ */
+ private void process(final UpdateFetchPositionsEvent
updateFetchPositionsEvent) {
+ try {
+ // The event could be completed in the app thread before it got to
be
+ // processed in the background (ex. interrupted)
+ if (updateFetchPositionsEvent.future().isCompletedExceptionally())
{
+ log.debug("UpdateFetchPositions event {} was completed
exceptionally before it " +
+ "got time to be processed.", updateFetchPositionsEvent);
+ return;
+ }
+
+ // Validate positions using the partition leader end offsets, to
detect if any partition
+ // has been truncated due to a leader change. This will trigger an
OffsetForLeaderEpoch
+ // request, retrieve the partition end offsets, and validate the
current position
+ // against it. It will throw an exception if log truncation is
detected.
+ requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
+
+ boolean hasAllFetchPositions =
subscriptions.hasAllFetchPositions();
+ if (hasAllFetchPositions) {
+ updateFetchPositionsEvent.future().complete(true);
+ return;
+ }
+
+ // Reset positions using committed offsets retrieved from the
group coordinator, for any
+ // partitions which do not have a valid position and are not
awaiting reset. This will
+ // trigger an OffsetFetch request and update positions with the
offsets retrieved. This
+ // will only do a coordinator lookup if there are partitions which
have missing
+ // positions, so a consumer with manually assigned partitions can
avoid a coordinator
+ // dependence by always ensuring that assigned partitions have an
initial position.
+ if (requestManagers.commitRequestManager.isPresent()) {
+ CompletableFuture<Void> initWithCommittedOffsetsResult =
initWithCommittedOffsetsIfNeeded(updateFetchPositionsEvent);
+ initWithCommittedOffsetsResult.whenComplete((__, error) -> {
+ if (error == null) {
+ // Retrieve partition offsets to init positions for
partitions that still
+ // don't have a valid position
+
initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent);
+ } else {
+
updateFetchPositionsEvent.future().completeExceptionally(error);
+ }
+ });
+ } else {
+ initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent);
+ }
+
+ } catch (Exception e) {
+
updateFetchPositionsEvent.future().completeExceptionally(maybeWrapAsKafkaException(e));
+ }
}
- private void process(final ValidatePositionsEvent event) {
- CompletableFuture<Void> future =
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
- future.whenComplete(complete(event.future()));
+ private void initWithPartitionOffsetsIfNeeded(final
UpdateFetchPositionsEvent updateFetchPositionsEvent) {
+ try {
+ // If there are partitions still needing a position and a reset
policy is defined,
+ // request reset using the default policy. If no reset strategy is
defined and there
+ // are partitions with a missing position, then we will raise a
NoOffsetForPartitionException exception.
+ subscriptions.resetInitializingPositions();
+
+ // Reset positions using partition offsets retrieved from the
leader, for any partitions
+ // which are awaiting reset. This will trigger a ListOffset
request, retrieve the
+ // partition offsets according to the strategy (ex. earliest,
latest), and update the
+ // positions.
+ CompletableFuture<Void> resetPositionsFuture =
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
+
+ resetPositionsFuture.whenComplete((result, error) -> {
+ if (updateFetchPositionsEvent.future().isDone()) {
+ log.debug("UpdateFetchPositions event {} had already
expired when reset " +
+ "positions completed.", updateFetchPositionsEvent);
+ return;
+ }
+ if (error == null) {
+ updateFetchPositionsEvent.future().complete(false);
+ } else {
+
updateFetchPositionsEvent.future().completeExceptionally(error);
+ }
+ });
+ } catch (Exception e) {
+ updateFetchPositionsEvent.future().completeExceptionally(e);
Review Comment:
uhm I wasn't completely sure about this, but after checking in detail seems
to make sense to wrap here. I was concerned that maybe we would mistakenly
wrap something that was expected from the API, given that we know perform in
the background lots of operations that were in the app thread before. But we do
handle the expected errors in the updatePositions flow and requests, and use
them to complete the future). So this would really be for unexpected errors in
the flow I would say, and ok to wrap. Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]