This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push: new fc267f4eb8c KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout (#16031) fc267f4eb8c is described below commit fc267f4eb8ca1c3ca13117e2534f9d8b54d924fa Author: Kirk True <k...@kirktrue.pro> AuthorDate: Fri Jun 7 00:53:27 2024 -0700 KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout (#16031) Improve consistency and correctness for user-provided timeouts at the Consumer network request layer, per the Java client Consumer timeouts design (https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts). While the changes introduced in KAFKA-15974 enforce timeouts at the Consumer's event layer, this change enforces timeouts at the network request layer. The changes mostly fit into the following areas: 1. Create shared code and idioms so timeout handling logic is consistent across current and future RequestManager implementations 2. Use deadlineMs instead of expirationMs, expirationTimeoutMs, retryExpirationTimeMs, timeoutMs, etc. 3. Update "preemptive pruning" to remove expired requests that have had at least one attempt Reviewers: Lianet Magrans <liane...@gmail.com>, Bruno Cadonna <cado...@apache.org> --- .../consumer/internals/CommitRequestManager.java | 133 ++++++++---------- .../consumer/internals/MembershipManagerImpl.java | 4 +- .../consumer/internals/NetworkClientDelegate.java | 11 +- .../consumer/internals/RequestManagers.java | 1 + .../consumer/internals/TimedRequestState.java | 71 ++++++++++ .../internals/TopicMetadataRequestManager.java | 53 +++---- .../events/CompletableApplicationEvent.java | 1 + .../internals/CommitRequestManagerTest.java | 153 +++++++++++++-------- .../consumer/internals/ConsumerTestBuilder.java | 2 +- .../consumer/internals/TimedRequestStateTest.java | 96 +++++++++++++ .../internals/TopicMetadataRequestManagerTest.java | 1 + .../kafka/api/AbstractConsumerTest.scala | 7 +- .../kafka/api/PlaintextConsumerPollTest.scala | 13 ++ .../test/scala/unit/kafka/utils/TestUtils.scala | 5 +- .../tiered/storage/TieredStorageTestContext.java | 3 +- 15 files changed, 382 insertions(+), 172 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 577cf7dee6b..000797dba09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -69,6 +69,7 @@ import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate. import static org.apache.kafka.common.protocol.Errors.COORDINATOR_LOAD_IN_PROGRESS; public class CommitRequestManager implements RequestManager, MemberStateListener { + private final Time time; private final SubscriptionState subscriptions; private final LogContext logContext; private final Logger log; @@ -133,6 +134,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener final OptionalDouble jitter, final Metrics metrics) { Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets"); + this.time = time; this.logContext = logContext; this.log = logContext.logger(getClass()); this.pendingRequests = new PendingRequests(); @@ -205,6 +207,13 @@ public class CommitRequestManager implements RequestManager, MemberStateListener .orElse(Long.MAX_VALUE); } + private KafkaException maybeWrapAsTimeoutException(Throwable t) { + if (t instanceof TimeoutException) + return (TimeoutException) t; + else + return new TimeoutException(t); + } + /** * Generate a request to commit consumed offsets. Add the request to the queue of pending * requests to be sent out on the next call to {@link #poll(long)}. If there are empty @@ -245,7 +254,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { OffsetCommitRequestState requestState = createOffsetCommitRequest( subscriptions.allConsumed(), - Optional.empty()); + Long.MAX_VALUE); CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = requestAutoCommit(requestState); // Reset timer to the interval (even if no request was generated), but ensure that if // the request completes with a retriable error, the timer is reset to send the next @@ -294,14 +303,14 @@ public class CommitRequestManager implements RequestManager, MemberStateListener * complete exceptionally if the commit fails with a non-retriable error, or if the retry * timeout expires. */ - public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final long retryExpirationTimeMs) { + public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final long deadlineMs) { if (!autoCommitEnabled()) { return CompletableFuture.completedFuture(null); } CompletableFuture<Void> result = new CompletableFuture<>(); OffsetCommitRequestState requestState = - createOffsetCommitRequest(subscriptions.allConsumed(), Optional.of(retryExpirationTimeMs)); + createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs); autoCommitSyncBeforeRevocationWithRetries(requestState, result); return result; } @@ -314,9 +323,9 @@ public class CommitRequestManager implements RequestManager, MemberStateListener result.complete(null); } else { if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { - if (error instanceof TimeoutException && requestAttempt.isExpired) { + if (requestAttempt.isExpired()) { log.debug("Auto-commit sync before revocation timed out and won't be retried anymore"); - result.completeExceptionally(error); + result.completeExceptionally(maybeWrapAsTimeoutException(error)); } else if (error instanceof UnknownTopicOrPartitionException) { log.debug("Auto-commit sync before revocation failed because topic or partition were deleted"); result.completeExceptionally(error); @@ -367,7 +376,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener log.debug("Skipping commit of empty offsets"); return CompletableFuture.completedFuture(null); } - OffsetCommitRequestState commitRequest = createOffsetCommitRequest(offsets, Optional.empty()); + OffsetCommitRequestState commitRequest = createOffsetCommitRequest(offsets, Long.MAX_VALUE); pendingRequests.addOffsetCommitRequest(commitRequest); CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>(); @@ -385,28 +394,26 @@ public class CommitRequestManager implements RequestManager, MemberStateListener * Commit offsets, retrying on expected retriable errors while the retry timeout hasn't expired. * * @param offsets Offsets to commit - * @param retryExpirationTimeMs Time until which the request will be retried if it fails with + * @param deadlineMs Time until which the request will be retried if it fails with * an expected retriable error. * @return Future that will complete when a successful response */ public CompletableFuture<Void> commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, - final long retryExpirationTimeMs) { + final long deadlineMs) { CompletableFuture<Void> result = new CompletableFuture<>(); - OffsetCommitRequestState requestState = createOffsetCommitRequest( - offsets, - Optional.of(retryExpirationTimeMs)); + OffsetCommitRequestState requestState = createOffsetCommitRequest(offsets, deadlineMs); commitSyncWithRetries(requestState, result); return result; } private OffsetCommitRequestState createOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs) { + final long deadlineMs) { return jitter.isPresent() ? new OffsetCommitRequestState( offsets, groupId, groupInstanceId, - expirationTimeMs, + deadlineMs, retryBackoffMs, retryBackoffMaxMs, jitter.getAsDouble(), @@ -415,7 +422,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener offsets, groupId, groupInstanceId, - expirationTimeMs, + deadlineMs, retryBackoffMs, retryBackoffMaxMs, memberInfo); @@ -432,9 +439,9 @@ public class CommitRequestManager implements RequestManager, MemberStateListener result.complete(null); } else { if (error instanceof RetriableException) { - if (error instanceof TimeoutException && requestAttempt.isExpired) { + if (requestAttempt.isExpired()) { log.info("OffsetCommit timeout expired so it won't be retried anymore"); - result.completeExceptionally(error); + result.completeExceptionally(maybeWrapAsTimeoutException(error)); } else { requestAttempt.resetFuture(); commitSyncWithRetries(requestAttempt, result); @@ -465,7 +472,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener * Enqueue a request to fetch committed offsets, that will be sent on the next call to {@link #poll(long)}. * * @param partitions Partitions to fetch offsets for. - * @param expirationTimeMs Time until which the request should be retried if it fails + * @param deadlineMs Time until which the request should be retried if it fails * with expected retriable errors. * @return Future that will complete when a successful response is received, or the request * fails and cannot be retried. Note that the request is retried whenever it fails with @@ -473,31 +480,31 @@ public class CommitRequestManager implements RequestManager, MemberStateListener */ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchOffsets( final Set<TopicPartition> partitions, - final long expirationTimeMs) { + final long deadlineMs) { if (partitions.isEmpty()) { return CompletableFuture.completedFuture(Collections.emptyMap()); } CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new CompletableFuture<>(); - OffsetFetchRequestState request = createOffsetFetchRequest(partitions, expirationTimeMs); + OffsetFetchRequestState request = createOffsetFetchRequest(partitions, deadlineMs); fetchOffsetsWithRetries(request, result); return result; } private OffsetFetchRequestState createOffsetFetchRequest(final Set<TopicPartition> partitions, - final long expirationTimeMs) { + final long deadlineMs) { return jitter.isPresent() ? new OffsetFetchRequestState( partitions, retryBackoffMs, retryBackoffMaxMs, - expirationTimeMs, + deadlineMs, jitter.getAsDouble(), memberInfo) : new OffsetFetchRequestState( partitions, retryBackoffMs, retryBackoffMaxMs, - expirationTimeMs, + deadlineMs, memberInfo); } @@ -516,8 +523,9 @@ public class CommitRequestManager implements RequestManager, MemberStateListener result.complete(res); } else { if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { - if (error instanceof TimeoutException && fetchRequest.isExpired) { - result.completeExceptionally(error); + if (fetchRequest.isExpired()) { + log.debug("OffsetFetch request for {} timed out and won't be retried anymore", fetchRequest.requestedPartitions); + result.completeExceptionally(maybeWrapAsTimeoutException(error)); } else { fetchRequest.resetFuture(); fetchOffsetsWithRetries(fetchRequest, result); @@ -612,12 +620,12 @@ public class CommitRequestManager implements RequestManager, MemberStateListener OffsetCommitRequestState(final Map<TopicPartition, OffsetAndMetadata> offsets, final String groupId, final Optional<String> groupInstanceId, - final Optional<Long> expirationTimeMs, + final long deadlineMs, final long retryBackoffMs, final long retryBackoffMaxMs, final MemberInfo memberInfo) { super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, - retryBackoffMaxMs, memberInfo, expirationTimeMs); + retryBackoffMaxMs, memberInfo, deadlineTimer(time, deadlineMs)); this.offsets = offsets; this.groupId = groupId; this.groupInstanceId = groupInstanceId; @@ -628,13 +636,13 @@ public class CommitRequestManager implements RequestManager, MemberStateListener OffsetCommitRequestState(final Map<TopicPartition, OffsetAndMetadata> offsets, final String groupId, final Optional<String> groupInstanceId, - final Optional<Long> expirationTimeMs, + final long deadlineMs, final long retryBackoffMs, final long retryBackoffMaxMs, final double jitter, final MemberInfo memberInfo) { super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, 2, - retryBackoffMaxMs, jitter, memberInfo, expirationTimeMs); + retryBackoffMaxMs, jitter, memberInfo, deadlineTimer(time, deadlineMs)); this.offsets = offsets; this.groupId = groupId; this.groupInstanceId = groupInstanceId; @@ -780,40 +788,24 @@ public class CommitRequestManager implements RequestManager, MemberStateListener * Represents a request that can be retried or aborted, based on member ID and epoch * information. */ - abstract class RetriableRequestState extends RequestState { + abstract class RetriableRequestState extends TimedRequestState { /** * Member info (ID and epoch) to be included in the request if present. */ final MemberInfo memberInfo; - /** - * Time until which the request should be retried if it fails with retriable - * errors. If not present, the request is triggered without waiting for a response or - * retrying. - */ - private final Optional<Long> expirationTimeMs; - - /** - * True if the request expiration time has been reached. This is set when validating the - * request expiration on {@link #poll(long)} before sending it. It is used to know if a - * request should be retried on TimeoutException. - */ - boolean isExpired; - RetriableRequestState(LogContext logContext, String owner, long retryBackoffMs, - long retryBackoffMaxMs, MemberInfo memberInfo, Optional<Long> expirationTimeMs) { - super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); + long retryBackoffMaxMs, MemberInfo memberInfo, Timer timer) { + super(logContext, owner, retryBackoffMs, retryBackoffMaxMs, timer); this.memberInfo = memberInfo; - this.expirationTimeMs = expirationTimeMs; } // Visible for testing RetriableRequestState(LogContext logContext, String owner, long retryBackoffMs, int retryBackoffExpBase, - long retryBackoffMaxMs, double jitter, MemberInfo memberInfo, Optional<Long> expirationTimeMs) { - super(logContext, owner, retryBackoffMs, retryBackoffExpBase, retryBackoffMaxMs, jitter); + long retryBackoffMaxMs, double jitter, MemberInfo memberInfo, Timer timer) { + super(logContext, owner, retryBackoffMs, retryBackoffExpBase, retryBackoffMaxMs, jitter, timer); this.memberInfo = memberInfo; - this.expirationTimeMs = expirationTimeMs; } /** @@ -828,13 +820,12 @@ public class CommitRequestManager implements RequestManager, MemberStateListener abstract CompletableFuture<?> future(); /** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. + * Complete the request future with a TimeoutException if the request has been sent out + * at least once and the timeout has been reached. */ - void maybeExpire(long currentTimeMs) { - if (retryTimeoutExpired(currentTimeMs)) { + void maybeExpire() { + if (numAttempts > 0 && isExpired()) { removeRequest(); - isExpired = true; future().completeExceptionally(new TimeoutException(requestDescription() + " could not complete before timeout expired.")); } @@ -846,11 +837,12 @@ public class CommitRequestManager implements RequestManager, MemberStateListener NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder<?> builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, - coordinatorRequestManager.coordinator()); + coordinatorRequestManager.coordinator() + ); request.whenComplete( (response, throwable) -> { - long currentTimeMs = request.handler().completionTimeMs(); - handleClientResponse(response, throwable, currentTimeMs); + long completionTimeMs = request.handler().completionTimeMs(); + handleClientResponse(response, throwable, completionTimeMs); }); return request; } @@ -875,10 +867,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener abstract void onResponse(final ClientResponse response); - boolean retryTimeoutExpired(long currentTimeMs) { - return expirationTimeMs.isPresent() && expirationTimeMs.get() <= currentTimeMs; - } - abstract void removeRequest(); } @@ -898,10 +886,10 @@ public class CommitRequestManager implements RequestManager, MemberStateListener public OffsetFetchRequestState(final Set<TopicPartition> partitions, final long retryBackoffMs, final long retryBackoffMaxMs, - final long expirationTimeMs, + final long deadlineMs, final MemberInfo memberInfo) { super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, - retryBackoffMaxMs, memberInfo, Optional.of(expirationTimeMs)); + retryBackoffMaxMs, memberInfo, deadlineTimer(time, deadlineMs)); this.requestedPartitions = partitions; this.future = new CompletableFuture<>(); } @@ -909,11 +897,11 @@ public class CommitRequestManager implements RequestManager, MemberStateListener public OffsetFetchRequestState(final Set<TopicPartition> partitions, final long retryBackoffMs, final long retryBackoffMaxMs, - final long expirationTimeMs, + final long deadlineMs, final double jitter, final MemberInfo memberInfo) { super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, 2, - retryBackoffMaxMs, jitter, memberInfo, Optional.of(expirationTimeMs)); + retryBackoffMaxMs, jitter, memberInfo, deadlineTimer(time, deadlineMs)); this.requestedPartitions = partitions; this.future = new CompletableFuture<>(); } @@ -1145,9 +1133,10 @@ public class CommitRequestManager implements RequestManager, MemberStateListener inflightOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny(); if (dupe.isPresent() || inflight.isPresent()) { - log.info("Duplicated OffsetFetchRequest: " + request.requestedPartitions); + log.debug("Duplicated unsent offset fetch request found for partitions: {}", request.requestedPartitions); dupe.orElseGet(inflight::get).chainFuture(request.future); } else { + log.debug("Enqueuing offset fetch request for partitions: {}", request.requestedPartitions); this.unsentOffsetFetches.add(request); } return request.future; @@ -1165,7 +1154,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener .filter(request -> !request.canSendRequest(currentTimeMs)) .collect(Collectors.toList()); - failAndRemoveExpiredCommitRequests(currentTimeMs); + failAndRemoveExpiredCommitRequests(); // Add all unsent offset commit requests to the unsentRequests list List<NetworkClientDelegate.UnsentRequest> unsentRequests = unsentOffsetCommits.stream() @@ -1179,7 +1168,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener unsentOffsetFetches.stream() .collect(Collectors.partitioningBy(request -> request.canSendRequest(currentTimeMs))); - failAndRemoveExpiredFetchRequests(currentTimeMs); + failAndRemoveExpiredFetchRequests(); // Add all sendable offset fetch requests to the unsentRequests list and to the inflightOffsetFetches list for (OffsetFetchRequestState request : partitionedBySendability.get(true)) { @@ -1200,18 +1189,18 @@ public class CommitRequestManager implements RequestManager, MemberStateListener * Find the unsent commit requests that have expired, remove them and complete their * futures with a TimeoutException. */ - private void failAndRemoveExpiredCommitRequests(final long currentTimeMs) { + private void failAndRemoveExpiredCommitRequests() { Queue<OffsetCommitRequestState> requestsToPurge = new LinkedList<>(unsentOffsetCommits); - requestsToPurge.forEach(req -> req.maybeExpire(currentTimeMs)); + requestsToPurge.forEach(RetriableRequestState::maybeExpire); } /** * Find the unsent fetch requests that have expired, remove them and complete their * futures with a TimeoutException. */ - private void failAndRemoveExpiredFetchRequests(final long currentTimeMs) { + private void failAndRemoveExpiredFetchRequests() { Queue<OffsetFetchRequestState> requestsToPurge = new LinkedList<>(unsentOffsetFetches); - requestsToPurge.forEach(req -> req.maybeExpire(currentTimeMs)); + requestsToPurge.forEach(RetriableRequestState::maybeExpire); } private void clearAll() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index 76a550ad719..2aabf4ae130 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -960,7 +960,7 @@ public class MembershipManagerImpl implements MembershipManager { // best effort to commit the offsets in the case where the epoch might have changed while // the current reconciliation is in process. Note this is using the rebalance timeout as // it is the limit enforced by the broker to complete the reconciliation process. - commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs)); + commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs)); // Execute commit -> onPartitionsRevoked -> onPartitionsAssigned. commitResult.whenComplete((__, commitReqError) -> { @@ -986,7 +986,7 @@ public class MembershipManagerImpl implements MembershipManager { }); } - long getExpirationTimeForTimeout(final long timeoutMs) { + long getDeadlineMsForTimeout(final long timeoutMs) { long expiration = time.milliseconds() + timeoutMs; if (expiration < 0) { return Long.MAX_VALUE; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 2cd6f6d8530..e2e4d529c00 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -316,11 +316,20 @@ public class NetworkClientDelegate implements AutoCloseable { @Override public String toString() { + String remainingMs; + + if (timer != null) { + timer.update(); + remainingMs = String.valueOf(timer.remainingMs()); + } else { + remainingMs = "<not set>"; + } + return "UnsentRequest{" + "requestBuilder=" + requestBuilder + ", handler=" + handler + ", node=" + node + - ", timer=" + timer + + ", remainingMs=" + remainingMs + '}'; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 75d87432db6..edd8ca97215 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -155,6 +155,7 @@ public class RequestManagers implements Closeable { apiVersions); final TopicMetadataRequestManager topic = new TopicMetadataRequestManager( logContext, + time, config); HeartbeatRequestManager heartbeatRequestManager = null; MembershipManager membershipManager = null; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java new file mode 100644 index 00000000000..c61032cea72 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + + private final Timer timer; + + public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { + super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); + this.timer = timer; + } + + public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final int retryBackoffExpBase, + final long retryBackoffMaxMs, + final double jitter, + final Timer timer) { + super(logContext, owner, retryBackoffMs, retryBackoffExpBase, retryBackoffMaxMs, jitter); + this.timer = timer; + } + + public boolean isExpired() { + timer.update(); + return timer.isExpired(); + } + + public long remainingMs() { + timer.update(); + return timer.remainingMs(); + } + + public static Timer deadlineTimer(final Time time, final long deadlineMs) { + long diff = Math.max(0, deadlineMs - time.milliseconds()); + return time.timer(diff); + } + + + @Override + protected String toStringBase() { + return super.toStringBase() + ", remainingMs=" + remainingMs(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java index 75a5ed08d15..a555d6ce7f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import java.util.Collections; @@ -61,6 +62,7 @@ import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate. */ public class TopicMetadataRequestManager implements RequestManager { + private final Time time; private final boolean allowAutoTopicCreation; private final List<TopicMetadataRequestState> inflightRequests; private final long retryBackoffMs; @@ -68,9 +70,10 @@ public class TopicMetadataRequestManager implements RequestManager { private final Logger log; private final LogContext logContext; - public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { + public TopicMetadataRequestManager(final LogContext context, final Time time, final ConsumerConfig config) { logContext = context; log = logContext.logger(getClass()); + this.time = time; inflightRequests = new LinkedList<>(); retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); @@ -81,7 +84,7 @@ public class TopicMetadataRequestManager implements RequestManager { public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { // Prune any requests which have timed out List<TopicMetadataRequestState> expiredRequests = inflightRequests.stream() - .filter(req -> req.isExpired(currentTimeMs)) + .filter(TimedRequestState::isExpired) .collect(Collectors.toList()); expiredRequests.forEach(TopicMetadataRequestState::expire); @@ -99,10 +102,10 @@ public class TopicMetadataRequestManager implements RequestManager { * * @return the future of the metadata request. */ - public CompletableFuture<Map<String, List<PartitionInfo>>> requestAllTopicsMetadata(final long expirationTimeMs) { + public CompletableFuture<Map<String, List<PartitionInfo>>> requestAllTopicsMetadata(final long deadlineMs) { TopicMetadataRequestState newRequest = new TopicMetadataRequestState( logContext, - expirationTimeMs, + deadlineMs, retryBackoffMs, retryBackoffMaxMs); inflightRequests.add(newRequest); @@ -115,11 +118,11 @@ public class TopicMetadataRequestManager implements RequestManager { * @param topic to be requested. * @return the future of the metadata request. */ - public CompletableFuture<Map<String, List<PartitionInfo>>> requestTopicMetadata(final String topic, final long expirationTimeMs) { + public CompletableFuture<Map<String, List<PartitionInfo>>> requestTopicMetadata(final String topic, final long deadlineMs) { TopicMetadataRequestState newRequest = new TopicMetadataRequestState( logContext, topic, - expirationTimeMs, + deadlineMs, retryBackoffMs, retryBackoffMaxMs); inflightRequests.add(newRequest); @@ -131,35 +134,32 @@ public class TopicMetadataRequestManager implements RequestManager { return inflightRequests; } - class TopicMetadataRequestState extends RequestState { + class TopicMetadataRequestState extends TimedRequestState { private final String topic; private final boolean allTopics; - private final long expirationTimeMs; CompletableFuture<Map<String, List<PartitionInfo>>> future; public TopicMetadataRequestState(final LogContext logContext, - final long expirationTimeMs, + final long deadlineMs, final long retryBackoffMs, final long retryBackoffMaxMs) { super(logContext, TopicMetadataRequestState.class.getSimpleName(), retryBackoffMs, - retryBackoffMaxMs); + retryBackoffMaxMs, deadlineTimer(time, deadlineMs)); future = new CompletableFuture<>(); this.topic = null; this.allTopics = true; - this.expirationTimeMs = expirationTimeMs; } public TopicMetadataRequestState(final LogContext logContext, final String topic, - final long expirationTimeMs, + final long deadlineMs, final long retryBackoffMs, final long retryBackoffMaxMs) { super(logContext, TopicMetadataRequestState.class.getSimpleName(), retryBackoffMs, - retryBackoffMaxMs); + retryBackoffMaxMs, deadlineTimer(time, deadlineMs)); future = new CompletableFuture<>(); this.topic = topic; this.allTopics = false; - this.expirationTimeMs = expirationTimeMs; } /** @@ -167,10 +167,6 @@ public class TopicMetadataRequestManager implements RequestManager { * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} if needed. */ private Optional<NetworkClientDelegate.UnsentRequest> send(final long currentTimeMs) { - if (currentTimeMs >= expirationTimeMs) { - return Optional.empty(); - } - if (!canSendRequest(currentTimeMs)) { return Optional.empty(); } @@ -183,10 +179,6 @@ public class TopicMetadataRequestManager implements RequestManager { return Optional.of(createUnsentRequest(request)); } - private boolean isExpired(final long currentTimeMs) { - return currentTimeMs >= expirationTimeMs; - } - private void expire() { completeFutureAndRemoveRequest( new TimeoutException("Timeout expired while fetching topic metadata")); @@ -210,9 +202,8 @@ public class TopicMetadataRequestManager implements RequestManager { private void handleError(final Throwable exception, final long completionTimeMs) { if (exception instanceof RetriableException) { - if (completionTimeMs >= expirationTimeMs) { - completeFutureAndRemoveRequest( - new TimeoutException("Timeout expired while fetching topic metadata")); + if (isExpired()) { + completeFutureAndRemoveRequest(new TimeoutException("Timeout expired while fetching topic metadata")); } else { onFailedAttempt(completionTimeMs); } @@ -222,20 +213,12 @@ public class TopicMetadataRequestManager implements RequestManager { } private void handleResponse(final ClientResponse response) { - long responseTimeMs = response.receivedTimeMs(); try { Map<String, List<PartitionInfo>> res = handleTopicMetadataResponse((MetadataResponse) response.responseBody()); future.complete(res); inflightRequests.remove(this); - } catch (RetriableException e) { - if (responseTimeMs >= expirationTimeMs) { - completeFutureAndRemoveRequest( - new TimeoutException("Timeout expired while fetching topic metadata")); - } else { - onFailedAttempt(responseTimeMs); - } - } catch (Exception t) { - completeFutureAndRemoveRequest(t); + } catch (Exception e) { + handleError(e, response.receivedTimeMs()); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java index dffac129021..8cd17d19feb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java @@ -43,6 +43,7 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent im return future; } + @Override public long deadlineMs() { return deadlineMs; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index b1db0297a12..8e61d61cbc5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -193,11 +193,11 @@ public class CommitRequestManagerTest { offsets2.put(new TopicPartition("test", 4), new OffsetAndMetadata(20L)); // Add the requests to the CommitRequestManager and store their futures - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - commitManager.commitSync(offsets1, expirationTimeMs); - commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), expirationTimeMs); - commitManager.commitSync(offsets2, expirationTimeMs); - commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 1)), expirationTimeMs); + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; + commitManager.commitSync(offsets1, deadlineMs); + commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), deadlineMs); + commitManager.commitSync(offsets2, deadlineMs); + commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 1)), deadlineMs); // Poll the CommitRequestManager and verify that the inflightOffsetFetches size is correct NetworkClientDelegate.PollResult result = commitManager.poll(time.milliseconds()); @@ -287,8 +287,8 @@ public class CommitRequestManagerTest { Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap( new TopicPartition("topic", 1), new OffsetAndMetadata(0)); - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, expirationTimeMs); + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; + CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs); sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManager, error, commitResult); // We expect that request should have been retried on this sync commit. @@ -307,8 +307,8 @@ public class CommitRequestManagerTest { new OffsetAndMetadata(0)); // Send sync offset commit that fails and verify it propagates the expected exception. - long expirationTimeMs = time.milliseconds() + retryBackoffMs; - CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, expirationTimeMs); + long deadlineMs = time.milliseconds() + retryBackoffMs; + CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs); completeOffsetCommitRequestWithError(commitRequestManager, commitError); assertFutureThrows(commitResult, expectedException); } @@ -332,8 +332,8 @@ public class CommitRequestManagerTest { Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap( new TopicPartition("topic", 1), new OffsetAndMetadata(0)); - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, expirationTimeMs); + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; + CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs); completeOffsetCommitRequestWithError(commitRequestManager, Errors.UNKNOWN_MEMBER_ID); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); @@ -594,7 +594,7 @@ public class CommitRequestManagerTest { @ParameterizedTest @MethodSource("offsetFetchExceptionSupplier") - public void testOffsetFetchRequestErroredRequests(final Errors error, final boolean isRetriable) { + public void testOffsetFetchRequestErroredRequests(final Errors error) { CommitRequestManager commitRequestManager = create(true, 100); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); @@ -606,7 +606,7 @@ public class CommitRequestManagerTest { 1, error); // we only want to make sure to purge the outbound buffer for non-retriables, so retriable will be re-queued. - if (isRetriable) + if (isRetriableOnOffsetFetch(error)) testRetriable(commitRequestManager, futures); else { testNonRetriable(futures); @@ -614,15 +614,49 @@ public class CommitRequestManagerTest { } } + @ParameterizedTest + @MethodSource("offsetFetchExceptionSupplier") + public void testOffsetFetchRequestTimeoutRequests(final Errors error) { + CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + + Set<TopicPartition> partitions = new HashSet<>(); + partitions.add(new TopicPartition("t1", 0)); + List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( + commitRequestManager, + partitions, + 1, + error); + + if (isRetriableOnOffsetFetch(error)) { + futures.forEach(f -> assertFalse(f.isDone())); + + // Insert a long enough sleep to force a timeout of the operation. Invoke poll() again so that each + // OffsetFetchRequestState is evaluated via isExpired(). + time.sleep(defaultApiTimeoutMs); + assertFalse(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); + commitRequestManager.poll(time.milliseconds()); + futures.forEach(f -> assertFutureThrows(f, TimeoutException.class)); + assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); + } else { + futures.forEach(f -> assertFutureThrows(f, KafkaException.class)); + assertEmptyPendingRequests(commitRequestManager); + } + } + + private boolean isRetriableOnOffsetFetch(Errors error) { + return error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE; + } + @Test public void testSuccessfulOffsetFetch() { CommitRequestManager commitManager = create(false, 100); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult = commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), - expirationTimeMs); + deadlineMs); // Send fetch request NetworkClientDelegate.PollResult result = commitManager.poll(time.milliseconds()); @@ -667,8 +701,8 @@ public class CommitRequestManagerTest { Set<TopicPartition> partitions = new HashSet<>(); partitions.add(new TopicPartition("t1", 0)); - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = commitRequestManager.fetchOffsets(partitions, expirationTimeMs); + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = commitRequestManager.fetchOffsets(partitions, deadlineMs); completeOffsetFetchRequestWithError(commitRequestManager, partitions, error); @@ -694,8 +728,8 @@ public class CommitRequestManagerTest { Set<TopicPartition> partitions = new HashSet<>(); partitions.add(new TopicPartition("t1", 0)); - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = commitRequestManager.fetchOffsets(partitions, expirationTimeMs); + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = commitRequestManager.fetchOffsets(partitions, deadlineMs); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); @@ -748,8 +782,8 @@ public class CommitRequestManagerTest { new OffsetAndMetadata(0)); // Send sync offset commit request that fails with retriable error. - long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; - CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, expirationTimeMs); + long deadlineMs = time.milliseconds() + retryBackoffMs * 2; + CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs); completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); // Request retried after backoff, and fails with retriable again. Should not complete yet @@ -770,8 +804,9 @@ public class CommitRequestManagerTest { * Sync commit requests that fail with an expected retriable error should be retried * while there is time. When time expires, they should fail with a TimeoutException. */ - @Test - public void testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExpires() { + @ParameterizedTest + @MethodSource("offsetCommitExceptionSupplier") + public void testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExpires(final Errors error) { CommitRequestManager commitRequestManager = create(false, 100); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); @@ -780,17 +815,21 @@ public class CommitRequestManagerTest { new OffsetAndMetadata(0)); // Send offset commit request that fails with retriable error. - long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; - CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, expirationTimeMs); - completeOffsetCommitRequestWithError(commitRequestManager, Errors.COORDINATOR_NOT_AVAILABLE); + long deadlineMs = time.milliseconds() + retryBackoffMs * 2; + CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs); + completeOffsetCommitRequestWithError(commitRequestManager, error); // Sleep to expire the request timeout. Request should fail on the next poll with a // TimeoutException. - time.sleep(expirationTimeMs); + time.sleep(deadlineMs); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(0, res.unsentRequests.size()); assertTrue(commitResult.isDone()); - assertFutureThrows(commitResult, TimeoutException.class); + + if (error.exception() instanceof RetriableException) + assertFutureThrows(commitResult, TimeoutException.class); + else + assertFutureThrows(commitResult, KafkaException.class); } /** @@ -829,8 +868,8 @@ public class CommitRequestManagerTest { Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0)); - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - commitRequestManager.commitSync(offsets, expirationTimeMs); + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; + commitRequestManager.commitSync(offsets, deadlineMs); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException()); @@ -911,8 +950,8 @@ public class CommitRequestManagerTest { when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); // Send request that is expected to fail with invalid epoch. - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - commitRequestManager.fetchOffsets(partitions, expirationTimeMs); + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; + commitRequestManager.fetchOffsets(partitions, deadlineMs); // Mock member has new a valid epoch. int newEpoch = 8; @@ -950,9 +989,9 @@ public class CommitRequestManagerTest { when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); // Send request that is expected to fail with invalid epoch. - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestResult = - commitRequestManager.fetchOffsets(partitions, expirationTimeMs); + commitRequestManager.fetchOffsets(partitions, deadlineMs); // Mock member not having a valid epoch anymore (left/failed/fenced). commitRequestManager.onMemberEpochUpdated(Optional.empty(), Optional.empty()); @@ -983,10 +1022,10 @@ public class CommitRequestManagerTest { TopicPartition tp = new TopicPartition("topic", 1); subscriptionState.assignFromUser(singleton(tp)); subscriptionState.seek(tp, 5); - long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; + long deadlineMs = time.milliseconds() + retryBackoffMs * 2; // Send commit request expected to be retried on STALE_MEMBER_EPOCH error while it does not expire - commitRequestManager.maybeAutoCommitSyncBeforeRevocation(expirationTimeMs); + commitRequestManager.maybeAutoCommitSyncBeforeRevocation(deadlineMs); int newEpoch = 8; String memberId = "member1"; @@ -1094,7 +1133,7 @@ public class CommitRequestManagerTest { } /** - * @return {@link Errors} that could be received in OffsetCommit responses. + * @return {@link Errors} that could be received in {@link ApiKeys#OFFSET_COMMIT} responses. */ private static Stream<Arguments> offsetCommitExceptionSupplier() { return Stream.of( @@ -1113,25 +1152,27 @@ public class CommitRequestManagerTest { Arguments.of(Errors.UNKNOWN_MEMBER_ID)); } - // Supplies (error, isRetriable) + /** + * @return {@link Errors} that could be received in {@link ApiKeys#OFFSET_FETCH} responses. + */ private static Stream<Arguments> offsetFetchExceptionSupplier() { - // fetchCommit is only retrying on a subset of RetriableErrors return Stream.of( - Arguments.of(Errors.NOT_COORDINATOR, true), - Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true), - Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false), - Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false), - Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false), - Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false), - Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false), - Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true), - Arguments.of(Errors.REQUEST_TIMED_OUT, false), - Arguments.of(Errors.FENCED_INSTANCE_ID, false), - Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false), - Arguments.of(Errors.UNKNOWN_MEMBER_ID, false), + Arguments.of(Errors.NOT_COORDINATOR), + Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS), + Arguments.of(Errors.UNKNOWN_SERVER_ERROR), + Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED), + Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE), + Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE), + Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION), + Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE), + Arguments.of(Errors.REQUEST_TIMED_OUT), + Arguments.of(Errors.FENCED_INSTANCE_ID), + Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED), + Arguments.of(Errors.UNKNOWN_MEMBER_ID), // Adding STALE_MEMBER_EPOCH as non-retriable here because it is only retried if a new // member epoch is received. Tested separately. - Arguments.of(Errors.STALE_MEMBER_EPOCH, false)); + Arguments.of(Errors.STALE_MEMBER_EPOCH), + Arguments.of(Errors.UNSTABLE_OFFSET_COMMIT)); } /** @@ -1155,9 +1196,9 @@ public class CommitRequestManagerTest { TopicPartition tp2 = new TopicPartition("t2", 3); partitions.add(tp1); partitions.add(tp2); - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = - commitRequestManager.fetchOffsets(partitions, expirationTimeMs); + commitRequestManager.fetchOffsets(partitions, deadlineMs); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); @@ -1215,9 +1256,9 @@ public class CommitRequestManagerTest { int numRequest, final Errors error) { List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = new ArrayList<>(); - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; + long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; for (int i = 0; i < numRequest; i++) { - futures.add(commitRequestManager.fetchOffsets(partitions, expirationTimeMs)); + futures.add(commitRequestManager.fetchOffsets(partitions, deadlineMs)); } NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 9f6fd4a764b..dabd697b896 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -174,7 +174,7 @@ public class ConsumerTestBuilder implements Closeable { backgroundEventHandler, logContext)); - this.topicMetadataRequestManager = spy(new TopicMetadataRequestManager(logContext, config)); + this.topicMetadataRequestManager = spy(new TopicMetadataRequestManager(logContext, time, config)); if (groupInfo.isPresent()) { GroupInformation gi = groupInfo.get(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TimedRequestStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TimedRequestStateTest.java new file mode 100644 index 00000000000..ddde3ae84d4 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TimedRequestStateTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TimedRequestStateTest { + + private final static long DEFAULT_TIMEOUT_MS = 30000; + private final Time time = new MockTime(); + + @Test + public void testIsExpired() { + TimedRequestState state = new TimedRequestState( + new LogContext(), + this.getClass().getSimpleName(), + 100, + 1000, + time.timer(DEFAULT_TIMEOUT_MS) + ); + assertFalse(state.isExpired()); + time.sleep(DEFAULT_TIMEOUT_MS); + assertTrue(state.isExpired()); + } + + @Test + public void testRemainingMs() { + TimedRequestState state = new TimedRequestState( + new LogContext(), + this.getClass().getSimpleName(), + 100, + 1000, + time.timer(DEFAULT_TIMEOUT_MS) + ); + assertEquals(DEFAULT_TIMEOUT_MS, state.remainingMs()); + time.sleep(DEFAULT_TIMEOUT_MS); + assertEquals(0, state.remainingMs()); + } + + @Test + public void testDeadlineTimer() { + long deadlineMs = time.milliseconds() + DEFAULT_TIMEOUT_MS; + Timer timer = TimedRequestState.deadlineTimer(time, deadlineMs); + assertEquals(DEFAULT_TIMEOUT_MS, timer.remainingMs()); + timer.sleep(DEFAULT_TIMEOUT_MS); + assertEquals(0, timer.remainingMs()); + } + + @Test + public void testAllowOverdueDeadlineTimer() { + long deadlineMs = time.milliseconds() - DEFAULT_TIMEOUT_MS; + Timer timer = TimedRequestState.deadlineTimer(time, deadlineMs); + assertEquals(0, timer.remainingMs()); + } + + @Test + public void testToStringUpdatesTimer() { + TimedRequestState state = new TimedRequestState( + new LogContext(), + this.getClass().getSimpleName(), + 100, + 1000, + time.timer(DEFAULT_TIMEOUT_MS) + ); + + assertToString(state, DEFAULT_TIMEOUT_MS); + time.sleep(DEFAULT_TIMEOUT_MS); + assertToString(state, 0); + } + + private void assertToString(TimedRequestState state, long timerMs) { + assertTrue(state.toString().contains("remainingMs=" + timerMs + "}")); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java index 3f2b2c3d983..56eff5b4f4b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java @@ -74,6 +74,7 @@ public class TopicMetadataRequestManagerTest { props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); this.topicMetadataRequestManager = spy(new TopicMetadataRequestManager( new LogContext(), + time, new ConsumerConfig(props))); } diff --git a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala index d242ea105e6..d29f05b36b3 100644 --- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.TopicPartition +import org.apache.kafka.test.{TestUtils => JTestUtils} import kafka.utils.TestUtils import kafka.server.BaseRequestTest import org.junit.jupiter.api.Assertions._ @@ -90,12 +91,14 @@ abstract class AbstractConsumerTest extends BaseRequestTest { s"The current assignment is ${consumer.assignment()}") } - def awaitNonEmptyRecords[K, V](consumer: Consumer[K, V], partition: TopicPartition): ConsumerRecords[K, V] = { + def awaitNonEmptyRecords[K, V](consumer: Consumer[K, V], + partition: TopicPartition, + pollTimeoutMs: Long = 100): ConsumerRecords[K, V] = { TestUtils.pollRecordsUntilTrue(consumer, (polledRecords: ConsumerRecords[K, V]) => { if (polledRecords.records(partition).asScala.nonEmpty) return polledRecords false - }, s"Consumer did not consume any messages for partition $partition before timeout.") + }, s"Consumer did not consume any messages for partition $partition before timeout.", JTestUtils.DEFAULT_MAX_WAIT_MS, pollTimeoutMs) throw new IllegalStateException("Should have timed out before reaching here") } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala index daed397e43f..0184a6eea67 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala @@ -238,6 +238,19 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { runMultiConsumerSessionTimeoutTest(true) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, groupProtocol: String): Unit = { + val numMessages = 100 + val producer = createProducer() + sendRecords(producer, numMessages, tp) + + val consumer = createConsumer() + consumer.subscribe(Set(topic).asJava) + val records = awaitNonEmptyRecords(consumer, tp, 0L) + assertEquals(numMessages, records.count()) + } + def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = { // use consumers defined in this class plus one additional consumer // Use topic defined in this class + one additional topic diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 31f671e3a0f..35b7ce418d1 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -903,9 +903,10 @@ object TestUtils extends Logging { def pollRecordsUntilTrue[K, V](consumer: Consumer[K, V], action: ConsumerRecords[K, V] => Boolean, msg: => String, - waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = { + waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, + pollTimeoutMs: Long = 100): Unit = { waitUntilTrue(() => { - val records = consumer.poll(Duration.ofMillis(100)) + val records = consumer.poll(Duration.ofMillis(pollTimeoutMs)) action(records) }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs) } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java index 059e33f3525..91455f86232 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java @@ -208,6 +208,7 @@ public final class TieredStorageTestContext implements AutoCloseable { consumer.seek(topicPartition, fetchOffset); long timeoutMs = 60_000L; + long pollTimeoutMs = 100L; String sep = System.lineSeparator(); List<ConsumerRecord<String, String>> records = new ArrayList<>(); Function1<ConsumerRecords<String, String>, Object> pollAction = polledRecords -> { @@ -218,7 +219,7 @@ public final class TieredStorageTestContext implements AutoCloseable { String.format("Could not consume %d records of %s from offset %d in %d ms. %d message(s) consumed:%s%s", expectedTotalCount, topicPartition, fetchOffset, timeoutMs, records.size(), sep, records.stream().map(Object::toString).collect(Collectors.joining(sep))); - TestUtils.pollRecordsUntilTrue(consumer, pollAction, messageSupplier, timeoutMs); + TestUtils.pollRecordsUntilTrue(consumer, pollAction, messageSupplier, timeoutMs, pollTimeoutMs); return records; }