kirktrue commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1733075083
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1581,48 +1577,27 @@ private Fetch<K, V> collectFetch() {
return fetch;
}
+
/**
* Set the fetch position to the committed position (if there is one)
* or reset it using the offset reset policy the user has configured.
*
- * @throws AuthenticationException If authentication fails. See the
exception for more details
- * @throws NoOffsetForPartitionException If no offset is stored for a
given partition and no offset reset policy is
- * defined
* @return true iff the operation completed without timing out
+ * @throws AuthenticationException If authentication fails. See the
exception for more details
+ * @throws NoOffsetForPartitionException If no offset is stored for a
given partition and no offset reset policy is
+ * defined
*/
private boolean updateFetchPositions(final Timer timer) {
try {
- // Validate positions using the partition leader end offsets, to
detect if any partition
- // has been truncated due to a leader change. This will trigger an
OffsetForLeaderEpoch
- // request, retrieve the partition end offsets, and validate the
current position against it.
- applicationEventHandler.addAndGet(new
ValidatePositionsEvent(calculateDeadlineMs(timer)));
-
- cachedSubscriptionHasAllFetchPositions =
subscriptions.hasAllFetchPositions();
- if (cachedSubscriptionHasAllFetchPositions) return true;
-
- // Reset positions using committed offsets retrieved from the
group coordinator, for any
- // partitions which do not have a valid position and are not
awaiting reset. This will
- // trigger an OffsetFetch request and update positions with the
offsets retrieved. This
- // will only do a coordinator lookup if there are partitions which
have missing
- // positions, so a consumer with manually assigned partitions can
avoid a coordinator
- // dependence by always ensuring that assigned partitions have an
initial position.
- if (isCommittedOffsetsManagementEnabled() &&
!initWithCommittedOffsetsIfNeeded(timer))
- return false;
-
- // If there are partitions still needing a position and a reset
policy is defined,
- // request reset using the default policy. If no reset strategy is
defined and there
- // are partitions with a missing position, then we will raise a
NoOffsetForPartitionException exception.
- subscriptions.resetInitializingPositions();
-
- // Reset positions using partition offsets retrieved from the
leader, for any partitions
- // which are awaiting reset. This will trigger a ListOffset
request, retrieve the
- // partition offsets according to the strategy (ex. earliest,
latest), and update the
- // positions.
- applicationEventHandler.addAndGet(new
ResetPositionsEvent(calculateDeadlineMs(timer)));
- return true;
+ UpdateFetchPositionsEvent updateFetchPositionsEvent = new
UpdateFetchPositionsEvent(calculateDeadlineMs(timer));
+ wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future());
Review Comment:
Why do we need to involve the wakeup trigger for this case?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +253,14 @@ private void process(final UnsubscribeEvent event) {
}
}
- private void process(final ResetPositionsEvent event) {
- CompletableFuture<Void> future =
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
- future.whenComplete(complete(event.future()));
- }
-
- private void process(final ValidatePositionsEvent event) {
- CompletableFuture<Void> future =
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
- future.whenComplete(complete(event.future()));
+ /**
+ * Fetch committed offsets and use them to update positions in the
subscription state. If no
+ * committed offsets available, fetch offsets from the leader.
+ */
+ private void process(final UpdateFetchPositionsEvent
updateFetchPositionsEvent) {
+ CompletableFuture<Boolean> future =
+
requestManagers.offsetsRequestManager.updateFetchPositions(updateFetchPositionsEvent.deadlineMs());
+ future.whenComplete(complete(updateFetchPositionsEvent.future()));
Review Comment:
Super nit: any reason not to make the variable named `event` like the other
methods? That'll help reduce visual clutter and make the lines a bit shorter:
```suggestion
private void process(final UpdateFetchPositionsEvent event) {
CompletableFuture<Boolean> future =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
future.whenComplete(complete(event.future()));
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -116,15 +138,27 @@ public OffsetsRequestManager(final SubscriptionState
subscriptionState,
this.subscriptionState = subscriptionState;
this.time = time;
this.requestTimeoutMs = requestTimeoutMs;
+ this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.apiVersions = apiVersions;
this.networkClientDelegate = networkClientDelegate;
- this.backgroundEventHandler = backgroundEventHandler;
this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata,
subscriptionState,
time, retryBackoffMs, apiVersions);
// Register the cluster metadata update callback. Note this only
relies on the
// requestsToRetry initialized above, and won't be invoked until all
managers are
// initialized and the network thread started.
this.metadata.addClusterUpdateListener(this);
+ this.commitRequestManager = commitRequestManager;
+ }
+
+ private static class PendingFetchCommittedRequest {
+ final Set<TopicPartition> requestedPartitions;
+ final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
+
+ private PendingFetchCommittedRequest(final Set<TopicPartition>
requestedPartitions,
+ final
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
Review Comment:
Super-duper nit: parameter alignment:
```suggestion
final
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition,
OffsetAndTimestampInternal>> fetchO
result.fetchedOffsets));
}
+ private boolean
maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) {
+ Throwable exception = cachedUpdatePositionsException.getAndSet(null);
+ if (exception != null) {
+ // Return exception that may have been encountered on a previous
attempt to update
+ // positions, after the triggering event had already expired.
+ result.completeExceptionally(exception);
+ return true;
+ }
+ return false;
+ }
+
+ public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+
+ if (maybeCompleteWithPreviousKnownException(result)) {
+ return result;
+ }
+
+ result.whenComplete((__, error) -> {
+ boolean updatePositionsExpired = time.milliseconds() >= deadlineMs;
+ if (error != null && updatePositionsExpired) {
+ // Update fetch positions operations are triggered
asynchronously here in the
+ // background thread, so they may complete (with error)
+ // when the triggering UpdateFetchPositionsEvent has been
already expired. Keep
+ // exception saved to be thrown on the next call to update
positions.
+ cachedUpdatePositionsException.set(error);
+ }
+ });
+
+ try {
+
Review Comment:
Super-nit: extra newline.
```suggestion
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -42,6 +42,7 @@
import java.util.function.BiConsumer;
import java.util.function.Supplier;
+
Review Comment:
Super nit: extra whitespace:
```suggestion
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition,
OffsetAndTimestampInternal>> fetchO
result.fetchedOffsets));
}
+ private boolean
maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) {
+ Throwable exception = cachedUpdatePositionsException.getAndSet(null);
+ if (exception != null) {
+ // Return exception that may have been encountered on a previous
attempt to update
+ // positions, after the triggering event had already expired.
+ result.completeExceptionally(exception);
+ return true;
+ }
+ return false;
+ }
+
+ public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+
+ if (maybeCompleteWithPreviousKnownException(result)) {
+ return result;
+ }
+
+ result.whenComplete((__, error) -> {
+ boolean updatePositionsExpired = time.milliseconds() >= deadlineMs;
+ if (error != null && updatePositionsExpired) {
+ // Update fetch positions operations are triggered
asynchronously here in the
+ // background thread, so they may complete (with error)
+ // when the triggering UpdateFetchPositionsEvent has been
already expired. Keep
+ // exception saved to be thrown on the next call to update
positions.
+ cachedUpdatePositionsException.set(error);
+ }
+ });
+
+ try {
+
+ // Validate positions using the partition leader end offsets, to
detect if any partition
+ // has been truncated due to a leader change. This will trigger an
OffsetForLeaderEpoch
+ // request, retrieve the partition end offsets, and validate the
current position
+ // against it. It will throw an exception if log truncation is
detected.
+ validatePositionsIfNeeded();
+
+ boolean hasAllFetchPositions =
subscriptionState.hasAllFetchPositions();
+ if (hasAllFetchPositions) {
+ result.complete(true);
+ return result;
+ }
+
+ // Reset positions using committed offsets retrieved from the
group coordinator, for any
+ // partitions which do not have a valid position and are not
awaiting reset. This will
+ // trigger an OffsetFetch request and update positions with the
offsets retrieved.
+ if (commitRequestManager != null) {
+ CompletableFuture<Void> initWithCommittedOffsetsResult =
initWithCommittedOffsetsIfNeeded(deadlineMs);
+ initWithCommittedOffsetsResult.whenComplete((__, error) -> {
+ if (error == null) {
+ initWithPartitionOffsetsIfNeeded(result);
+ } else {
+ result.completeExceptionally(error);
+ }
+ });
+ } else {
+ initWithPartitionOffsetsIfNeeded(result);
+ }
+ } catch (Exception e) {
+ result.completeExceptionally(maybeWrapAsKafkaException(e));
+ }
+ return result;
+ }
+
+ /**
+ * If there are partitions still needing a position and a reset policy is
defined, request
+ * reset using the default policy.
+ *
+ * @param result Future that will complete when the reset operation
completes.
+ * @throws NoOffsetForPartitionException If no reset strategy is configured
+ */
+ private void initWithPartitionOffsetsIfNeeded(CompletableFuture<Boolean>
result) {
+ try {
+ // Mark partitions that need reset, using the configured reset
strategy. If no
+ // strategy is defined, this will raise a
NoOffsetForPartitionException exception.
+ subscriptionState.resetInitializingPositions();
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ return;
+ }
+
+ // For partitions awaiting reset, generate a ListOffset request to
retrieve the partition
+ // offsets according to the strategy (ex. earliest, latest), and
update the positions.
+ resetPositionsIfNeeded().whenComplete((resetResult, error) -> {
+ if (error == null) {
+ result.complete(false);
+ } else {
+ result.completeExceptionally(error);
+ }
+ });
+ }
+
+ // Visible for testing
+ boolean hasPendingOffsetFetchEvent() {
+ return pendingOffsetFetchEvent != null;
+ }
+
+ /**
+ * Fetch the committed offsets for partitions that require initialization.
Use them to set
+ * the fetch positions in the subscription state.
+ *
+ * @throws TimeoutException If offsets could not be retrieved within the
timeout
+ */
+ private CompletableFuture<Void> initWithCommittedOffsetsIfNeeded(long
deadlineMs) {
+ final Set<TopicPartition> initializingPartitions =
subscriptionState.initializingPartitions();
+
+ if (initializingPartitions.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ log.debug("Refreshing committed offsets for partitions {}",
initializingPartitions);
+ CompletableFuture<Void> result = new CompletableFuture<>();
+
+ // The shorter the timeout provided to poll(), the more likely the
offsets fetch will time out. To handle
+ // this case, on the first attempt to fetch the committed offsets, a
FetchCommittedOffsetsEvent is created
+ // (with potentially a longer timeout) and stored. The event is used
for the first attempt, but in the
+ // case it times out, subsequent attempts will also use the event in
order to wait for the results.
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
fetchCommittedFuture;
+ if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+ if (pendingOffsetFetchEvent != null) {
+ // This will be the case where we were waiting for a fetch
committed offsets request
+ // to update positions, but the set of initializing partitions
changed. We need to
+ // cancel the pending future, to ensure that it's results are
not used to update
+ pendingOffsetFetchEvent.result.cancel(true);
+ }
+ // Need to generate a new request to fetch committed offsets
+ final long fetchCommittedDeadlineMs = Math.max(deadlineMs,
time.milliseconds() + defaultApiTimeoutMs);
+ fetchCommittedFuture =
commitRequestManager.fetchOffsets(initializingPartitions,
fetchCommittedDeadlineMs);
+ pendingOffsetFetchEvent = new
PendingFetchCommittedRequest(initializingPartitions,
+ fetchCommittedFuture);
+ } else {
+ fetchCommittedFuture = pendingOffsetFetchEvent.result;
+ }
+
+ // when the ongoing OffsetFetch completes, carry on with updating
positions and
+ // completing the result future for the current attempt to
initWithCommittedOffsetsIfNeeded
+ fetchCommittedFuture.whenComplete((offsets, error) -> {
+ pendingOffsetFetchEvent = null;
+ if (error instanceof CancellationException) {
+ // Abort updating positions
+ return;
+ }
+ // If an offset fetch triggered to update positions finishes
without being
+ // cancelled, we update positions even if the original event
expired. The event
+ // is cancelled whenever the set of partitions to initialize
changes
+ if (error == null) {
+ refreshCommittedOffsets(offsets, metadata, subscriptionState);
+ result.complete(null);
+ } else {
+ log.error("Error fetching committed offsets to update
positions", error);
+ result.completeExceptionally(error);
+ }
+ });
+
+ return result;
+ }
+
+ /**
+ * This determines if the {@link #pendingOffsetFetchEvent pending offset
fetch event} can be reused. Reuse
+ * is only possible if all the following conditions are true:
+ *
+ * <ul>
+ * <li>A pending offset fetch event exists</li>
+ * <li>The partition set of the pending offset fetch event is the same
as the given partition set</li>
+ * </ul>
+ */
+ private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition>
partitions) {
+ if (pendingOffsetFetchEvent == null) {
+ return false;
+ }
+
+ return pendingOffsetFetchEvent.requestedPartitions.equals(partitions);
+ }
Review Comment:
The corresponding code in
`AsyncKafkaConsumer.canReusePendingOffsetFetchEvent` also checked that the
`pendingOffsetFetchEvent`’s deadline hadn't passed. Is that no longer a concern
after the refactor?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##########
@@ -90,9 +88,9 @@ public class OffsetsRequestManagerTest {
private OffsetsRequestManager requestManager;
private ConsumerMetadata metadata;
private SubscriptionState subscriptionState;
- private MockTime time;
+ private final Time time = mock(Time.class);
Review Comment:
Out of curiosity, what prompted this change?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition,
OffsetAndTimestampInternal>> fetchO
result.fetchedOffsets));
}
+ private boolean
maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) {
+ Throwable exception = cachedUpdatePositionsException.getAndSet(null);
+ if (exception != null) {
+ // Return exception that may have been encountered on a previous
attempt to update
+ // positions, after the triggering event had already expired.
+ result.completeExceptionally(exception);
+ return true;
+ }
+ return false;
+ }
+
+ public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+
+ if (maybeCompleteWithPreviousKnownException(result)) {
+ return result;
+ }
+
+ result.whenComplete((__, error) -> {
+ boolean updatePositionsExpired = time.milliseconds() >= deadlineMs;
+ if (error != null && updatePositionsExpired) {
+ // Update fetch positions operations are triggered
asynchronously here in the
+ // background thread, so they may complete (with error)
+ // when the triggering UpdateFetchPositionsEvent has been
already expired. Keep
+ // exception saved to be thrown on the next call to update
positions.
+ cachedUpdatePositionsException.set(error);
+ }
+ });
+
+ try {
+
+ // Validate positions using the partition leader end offsets, to
detect if any partition
+ // has been truncated due to a leader change. This will trigger an
OffsetForLeaderEpoch
+ // request, retrieve the partition end offsets, and validate the
current position
+ // against it. It will throw an exception if log truncation is
detected.
+ validatePositionsIfNeeded();
+
+ boolean hasAllFetchPositions =
subscriptionState.hasAllFetchPositions();
+ if (hasAllFetchPositions) {
+ result.complete(true);
+ return result;
+ }
Review Comment:
I don't think we need the temporary variable with the refactored code.
```suggestion
if (subscriptionState.hasAllFetchPositions()) {
result.complete(true);
return result;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]