This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 77ba06fa620 KAFKA-16033: Commit retry logic fixes (#15357) 77ba06fa620 is described below commit 77ba06fa620ad9cc42af92f42354661308676135 Author: Lianet Magrans <98415067+lian...@users.noreply.github.com> AuthorDate: Wed Feb 21 05:08:37 2024 -0500 KAFKA-16033: Commit retry logic fixes (#15357) This change modifies the commit manager for improved retry logic & fixing bugs: - defines high level functions for each of the different types of commit: commitSync, commitAsync, autoCommitSync (used from consumer close), autoCommitAsync (on interval), autoCommitNow (before revocation). - moves retry logic to these caller functions, keeping a common response error handling that propagates errors that each caller functions retry as it needs. Fixes the following issues: - auto-commit before revocation should retry with latest consumed offsets - auto-commit before revocation should only reset the timer once, when the rebalance completes - StaleMemberEpoch error (fatal) is considered retriable only when committing offsets before revocation, where it is retried with backoff if the member has a valid epoch. All other commits will fail fatally on stale epoch. Note that auto commit on the interval (autoCommitAsync) does not have any specific retry logic for the stale epoch, but will effectively retry on the next interval (as it does for any other fatal error) - fix duplicated and noisy logs for auto-commit Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- .../consumer/internals/AsyncKafkaConsumer.java | 24 +- .../consumer/internals/CommitRequestManager.java | 669 +++++++++++---------- .../consumer/internals/MembershipManagerImpl.java | 12 +- .../internals/events/ApplicationEvent.java | 2 +- .../events/ApplicationEventProcessor.java | 32 +- .../events/AsyncCommitApplicationEvent.java | 39 ++ .../internals/events/CommitApplicationEvent.java | 34 +- .../events/SyncCommitApplicationEvent.java | 52 ++ .../consumer/internals/AsyncKafkaConsumerTest.java | 71 ++- .../internals/CommitRequestManagerTest.java | 291 +++++---- .../internals/ConsumerNetworkThreadTest.java | 26 +- .../internals/MembershipManagerImplTest.java | 4 +- 12 files changed, 719 insertions(+), 537 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 6781e0b73cb..d2e4788a1bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -40,6 +40,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; @@ -57,6 +58,7 @@ import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdat import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; @@ -755,9 +757,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { acquireAndEnsureOpen(); try { - // Commit without timer to indicate that the commit should be triggered without - // waiting for a response. - CompletableFuture<Void> future = commit(offsets, false, Optional.empty()); + AsyncCommitApplicationEvent asyncCommitEvent = new AsyncCommitApplicationEvent(offsets); + CompletableFuture<Void> future = commit(asyncCommitEvent); future.whenComplete((r, t) -> { if (t == null) { @@ -778,14 +779,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { } } - // Visible for testing - CompletableFuture<Void> commit(final Map<TopicPartition, OffsetAndMetadata> offsets, - final boolean isWakeupable, - final Optional<Long> retryTimeoutMs) { + private CompletableFuture<Void> commit(final CommitApplicationEvent commitEvent) { maybeInvokeCommitCallbacks(); maybeThrowFencedInstanceException(); maybeThrowInvalidGroupIdException(); + Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets(); log.debug("Committing offsets: {}", offsets); offsets.forEach(this::updateLastSeenEpochIfNewer); @@ -793,11 +792,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { return CompletableFuture.completedFuture(null); } - final CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets, retryTimeoutMs); - if (isWakeupable) { - // the task can only be woken up if the top level API call is commitSync - wakeupTrigger.setActiveTask(commitEvent.future()); - } applicationEventHandler.add(commitEvent); return commitEvent.future(); } @@ -1344,9 +1338,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { long commitStart = time.nanoseconds(); try { Timer requestTimer = time.timer(timeout.toMillis()); - // Commit with a timer to control how long the request should be retried until it - // gets a successful response or non-retriable error. - CompletableFuture<Void> commitFuture = commit(offsets, true, Optional.of(timeout.toMillis())); + SyncCommitApplicationEvent syncCommitEvent = new SyncCommitApplicationEvent(offsets, timeout.toMillis()); + CompletableFuture<Void> commitFuture = commit(syncCommitEvent); + wakeupTrigger.setActiveTask(commitFuture); ConsumerUtils.getResult(commitFuture, requestTimer); interceptors.onCommit(offsets); } finally { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 704da62178c..9206783d561 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnstableOffsetCommitException; @@ -168,7 +169,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener return drainPendingOffsetCommitRequests(); } - maybeAutoCommitAllConsumedAsync(); + maybeAutoCommitAsync(); if (!pendingRequests.hasUnsentRequests()) return EMPTY; @@ -204,126 +205,313 @@ public class CommitRequestManager implements RequestManager, MemberStateListener } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ - private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - boolean checkInterval, - boolean retryOnStaleEpoch) { - if (!autoCommitEnabled()) { - log.debug("Skipping auto-commit because auto-commit config is not enabled."); - return CompletableFuture.completedFuture(null); - } - + private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); - if (checkInterval && !autocommit.shouldAutoCommit()) { - return CompletableFuture.completedFuture(null); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result; + if (requestState.offsets.isEmpty()) { + result = CompletableFuture.completedFuture(Collections.emptyMap()); + } else { + autocommit.setInflightCommitStatus(true); + OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); + result = request.future; + result.whenComplete(autoCommitCallback(request.offsets)); } - - autocommit.resetTimer(); - autocommit.setInflightCommitStatus(true); - CompletableFuture<Void> result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) - .whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * <p/> + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() { - if (!autoCommitEnabled()) { - // Early return to ensure that no action/logging is performed. - return CompletableFuture.completedFuture(null); + public void maybeAutoCommitAsync() { + if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { + OffsetCommitRequestState requestState = createOffsetCommitRequest( + subscriptions.allConsumed(), + Optional.empty()); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = requestAutoCommit(requestState); + // Reset timer to the interval (even if no request was generated), but ensure that if + // the request completes with a retriable error, the timer is reset to send the next + // auto-commit after the backoff expires. + resetAutoCommitTimer(); + maybeResetTimerWithBackoff(result); } - Map<TopicPartition, OffsetAndMetadata> offsets = subscriptions.allConsumed(); - CompletableFuture<Void> result = maybeAutoCommit(offsets, Optional.empty(), true, true); - result.whenComplete((__, error) -> { + } + + /** + * Reset auto-commit timer to retry with backoff if the future failed with a RetriableCommitFailedException. + */ + private void maybeResetTimerWithBackoff(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { + result.whenComplete((offsets, error) -> { if (error != null) { if (error instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error); resetAutoCommitTimer(retryBackoffMs); } else { - log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); + log.debug("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); } } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } }); - - return result; } /** - * Commit consumed offsets if auto-commit is enabled. Retry while the timer is not expired, - * until the request succeeds or fails with a fatal error. + * Commit consumed offsets if auto-commit is enabled, regardless of the auto-commit interval. + * This is used for committing offsets before revoking partitions. This will retry committing + * the latest offsets until the request succeeds, fails with a fatal error, or the timeout + * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as a retriable error, + * and will retry it including the latest member ID and epoch received from the broker. + * + * @return Future that will complete when the offsets are successfully committed. It will + * complete exceptionally if the commit fails with a non-retriable error, or if the retry + * timeout expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { - return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, false, retryOnStaleEpoch); + public CompletableFuture<Void> maybeAutoCommitSyncNow(final long retryExpirationTimeMs) { + if (!autoCommitEnabled()) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = + createOffsetCommitRequest(subscriptions.allConsumed(), Optional.of(retryExpirationTimeMs)); + autoCommitSyncNowWithRetries(requestState, result); + return result; } - private BiConsumer<? super Void, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { + private void autoCommitSyncNowWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAttempt = requestAutoCommit(requestAttempt); + commitAttempt.whenComplete((committedOffsets, error) -> { + if (error == null) { + result.complete(null); + } else { + if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { + if (error instanceof TimeoutException && requestAttempt.isExpired) { + log.debug("Auto-commit sync timed out and won't be retried anymore"); + result.completeExceptionally(error); + } else { + // Make sure the auto-commit is retries with the latest offsets + requestAttempt.offsets = subscriptions.allConsumed(); + requestAttempt.resetFuture(); + autoCommitSyncNowWithRetries(requestAttempt, result); + } + } else { + log.debug("Auto-commit sync failed with non-retriable error", error); + result.completeExceptionally(error); + } + } + }); + } + + /** + * Clear the inflight auto-commit flag and log auto-commit completion status. + */ + private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { return (response, throwable) -> { autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); if (throwable == null) { offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets); - log.debug("Completed asynchronous auto-commit of offsets {}", allConsumedOffsets); + log.debug("Completed auto-commit of offsets {}", allConsumedOffsets); } else if (throwable instanceof RetriableCommitFailedException) { - log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", + log.debug("Auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, throwable.getMessage()); } else { - log.warn("Asynchronous auto-commit of offsets {} failed", allConsumedOffsets, throwable); + log.warn("Auto-commit of offsets {} failed", allConsumedOffsets, throwable); } }; } /** - * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an - * {@link OffsetCommitRequestState} and enqueue it to send later. + * Generate a request to commit offsets without retrying, even if it fails with a retriable + * error. The generated request will be added to the queue to be sent on the next call to + * {@link #poll(long)}. + * + * @param offsets Offsets to commit per partition. + * @return Future that will complete when a response is received, successfully or + * exceptionally depending on the response. If the request fails with a retriable error, the + * future will be completed with a {@link RetriableCommitFailedException}. */ - public CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { + public CompletableFuture<Void> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) { if (offsets.isEmpty()) { log.debug("Skipping commit of empty offsets"); return CompletableFuture.completedFuture(null); } - return pendingRequests.addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch).future; + OffsetCommitRequestState commitRequest = createOffsetCommitRequest(offsets, Optional.empty()); + pendingRequests.addOffsetCommitRequest(commitRequest); + + CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>(); + commitRequest.future.whenComplete((committedOffsets, error) -> { + if (error != null) { + asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error)); + } else { + asyncCommitResult.complete(null); + } + }); + return asyncCommitResult; } /** - * Handles {@link org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent}. It creates an - * {@link OffsetFetchRequestState} and enqueue it to send later. + * Commit offsets, retrying on expected retriable errors while the retry timeout hasn't expired. + * + * @param offsets Offsets to commit + * @param retryExpirationTimeMs Time until which the request will be retried if it fails with + * an expected retriable error. + * @return Future that will complete when a successful response */ - public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest( + public CompletableFuture<Void> commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, + final long retryExpirationTimeMs) { + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = createOffsetCommitRequest( + offsets, + Optional.of(retryExpirationTimeMs)); + commitSyncWithRetries(requestState, result); + return result; + } + + private OffsetCommitRequestState createOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, + final Optional<Long> expirationTimeMs) { + return jitter.isPresent() ? + new OffsetCommitRequestState( + offsets, + groupId, + groupInstanceId, + expirationTimeMs, + retryBackoffMs, + retryBackoffMaxMs, + jitter.getAsDouble(), + memberInfo) : + new OffsetCommitRequestState( + offsets, + groupId, + groupInstanceId, + expirationTimeMs, + retryBackoffMs, + retryBackoffMaxMs, + memberInfo); + } + + private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + pendingRequests.addOffsetCommitRequest(requestAttempt); + + // Retry the same commit request while it fails with RetriableException and the retry + // timeout hasn't expired. + requestAttempt.future.whenComplete((res, error) -> { + if (error == null) { + result.complete(null); + } else { + if (error instanceof RetriableException) { + if (error instanceof TimeoutException && requestAttempt.isExpired) { + log.info("OffsetCommit timeout expired so it won't be retried anymore"); + result.completeExceptionally(error); + } else { + requestAttempt.resetFuture(); + commitSyncWithRetries(requestAttempt, result); + } + } else { + result.completeExceptionally(commitSyncExceptionForError(error)); + } + } + }); + } + + private Throwable commitSyncExceptionForError(Throwable error) { + if (error instanceof StaleMemberEpochException) { + return new CommitFailedException("OffsetCommit failed with stale member epoch." + + Errors.STALE_MEMBER_EPOCH.message()); + } + return error; + } + + private Throwable commitAsyncExceptionForError(Throwable error) { + if (error instanceof RetriableException) { + return new RetriableCommitFailedException(error.getMessage()); + } + return error; + } + + /** + * Enqueue a request to fetch committed offsets, that will be sent on the next call to {@link #poll(long)}. + * + * @param partitions Partitions to fetch offsets for. + * @param expirationTimeMs Time until which the request should be retried if it fails + * with expected retriable errors. + * @return Future that will complete when a successful response is received, or the request + * fails and cannot be retried. Note that the request is retried whenever it fails with + * retriable expected error and the retry time hasn't expired. + */ + public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchOffsets( final Set<TopicPartition> partitions, final long expirationTimeMs) { - return pendingRequests.addOffsetFetchRequest(partitions, expirationTimeMs); + if (partitions.isEmpty()) { + return CompletableFuture.completedFuture(Collections.emptyMap()); + } + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new CompletableFuture<>(); + OffsetFetchRequestState request = createOffsetFetchRequest(partitions, expirationTimeMs); + fetchOffsetsWithRetries(request, result); + return result; + } + + private OffsetFetchRequestState createOffsetFetchRequest(final Set<TopicPartition> partitions, + final long expirationTimeMs) { + return jitter.isPresent() ? + new OffsetFetchRequestState( + partitions, + retryBackoffMs, + retryBackoffMaxMs, + expirationTimeMs, + jitter.getAsDouble(), + memberInfo) : + new OffsetFetchRequestState( + partitions, + retryBackoffMs, + retryBackoffMaxMs, + expirationTimeMs, + memberInfo); + } + + private void fetchOffsetsWithRetries(final OffsetFetchRequestState fetchRequest, + final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> currentResult = pendingRequests.addOffsetFetchRequest(fetchRequest); + + // Retry the same fetch request while it fails with RetriableException and the retry timeout hasn't expired. + currentResult.whenComplete((res, error) -> { + boolean inflightRemoved = pendingRequests.inflightOffsetFetches.remove(fetchRequest); + if (!inflightRemoved) { + log.warn("A duplicated, inflight, request was identified, but unable to find it in the " + + "outbound buffer:" + fetchRequest); + } + if (error == null) { + result.complete(res); + } else { + if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { + if (error instanceof TimeoutException && fetchRequest.isExpired) { + result.completeExceptionally(error); + } else { + fetchRequest.resetFuture(); + fetchOffsetsWithRetries(fetchRequest, result); + } + } else + result.completeExceptionally(error); + } + }); + } + + private boolean isStaleEpochErrorAndValidEpochAvailable(Throwable error) { + return error instanceof StaleMemberEpochException && memberInfo.memberEpoch.isPresent(); } public void updateAutoCommitTimer(final long currentTimeMs) { @@ -393,18 +581,15 @@ public class CommitRequestManager implements RequestManager, MemberStateListener } private class OffsetCommitRequestState extends RetriableRequestState { - private final Map<TopicPartition, OffsetAndMetadata> offsets; + private Map<TopicPartition, OffsetAndMetadata> offsets; private final String groupId; private final Optional<String> groupInstanceId; - private final CompletableFuture<Void> future; - /** - * Time until which the request should be retried if it fails with retriable - * errors. If not present, the request is triggered without waiting for a response or - * retrying. + * Future containing the offsets that were committed. It completes when a response is + * received for the commit request. */ - private final Optional<Long> expirationTimeMs; + private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; OffsetCommitRequestState(final Map<TopicPartition, OffsetAndMetadata> offsets, final String groupId, @@ -412,15 +597,13 @@ public class CommitRequestManager implements RequestManager, MemberStateListener final Optional<Long> expirationTimeMs, final long retryBackoffMs, final long retryBackoffMaxMs, - final MemberInfo memberInfo, - final boolean retryOnStaleEpoch) { + final MemberInfo memberInfo) { super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, - retryBackoffMaxMs, memberInfo, retryOnStaleEpoch); + retryBackoffMaxMs, memberInfo, expirationTimeMs); this.offsets = offsets; this.groupId = groupId; this.groupInstanceId = groupInstanceId; this.future = new CompletableFuture<>(); - this.expirationTimeMs = expirationTimeMs; } // Visible for testing @@ -431,15 +614,13 @@ public class CommitRequestManager implements RequestManager, MemberStateListener final long retryBackoffMs, final long retryBackoffMaxMs, final double jitter, - final MemberInfo memberInfo, - final boolean retryOnStaleEpoch) { + final MemberInfo memberInfo) { super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, 2, - retryBackoffMaxMs, jitter, memberInfo, retryOnStaleEpoch); + retryBackoffMaxMs, jitter, memberInfo, expirationTimeMs); this.offsets = offsets; this.groupId = groupId; this.groupInstanceId = groupInstanceId; this.future = new CompletableFuture<>(); - this.expirationTimeMs = expirationTimeMs; } public NetworkClientDelegate.UnsentRequest toUnsentRequest() { @@ -494,14 +675,16 @@ public class CommitRequestManager implements RequestManager, MemberStateListener for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) { TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); - OffsetAndMetadata offsetAndMetadata = offsets.get(tp); - long offset = offsetAndMetadata.offset(); + Errors error = Errors.forCode(partition.errorCode()); if (error == Errors.NONE) { + OffsetAndMetadata offsetAndMetadata = offsets.get(tp); + long offset = offsetAndMetadata.offset(); log.debug("OffsetCommit completed successfully for offset {} partition {}", offset, tp); continue; } + onFailedAttempt(currentTimeMs); if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); return; @@ -509,11 +692,11 @@ public class CommitRequestManager implements RequestManager, MemberStateListener error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) { coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs); - maybeRetry(currentTimeMs, error.exception()); + future.completeExceptionally(error.exception()); return; } else if (error == Errors.FENCED_INSTANCE_ID) { String fencedError = "OffsetCommit failed due to group instance id fenced: " + groupInstanceId; - log.error(fencedError, error.message()); + log.error(fencedError); future.completeExceptionally(new CommitFailedException(fencedError)); return; } else if (error == Errors.OFFSET_METADATA_TOO_LARGE || @@ -523,21 +706,15 @@ public class CommitRequestManager implements RequestManager, MemberStateListener } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { // just retry - maybeRetry(currentTimeMs, error.exception()); + future.completeExceptionally(error.exception()); return; } else if (error == Errors.UNKNOWN_MEMBER_ID) { - log.error("OffsetCommit failed with {} on partition {} for offset {}", - error, tp, offset); + log.error("OffsetCommit failed with {}", error); future.completeExceptionally(new CommitFailedException("OffsetCommit " + "failed with unknown member ID. " + error.message())); return; } else if (error == Errors.STALE_MEMBER_EPOCH) { - if (maybeRetryWithNewMemberEpoch(currentTimeMs, error)) { - log.debug("OffsetCommit failed with {} and will be retried with the " + - "latest member ID and epoch.", error); - return; - } - future.completeExceptionally(commitExceptionForStaleMemberEpoch()); + future.completeExceptionally(error.exception()); return; } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { // Collect all unauthorized topics before failing @@ -559,31 +736,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener } } - /** - * Enqueue the request to be retried with exponential backoff, if the request allows - * retries and the timer has not expired. Complete the request future exceptionally if - * the request won't be retried. - */ - @Override - void maybeRetry(long currentTimeMs, Throwable throwable) { - if (!allowsRetries()) { - // Fail requests that do not allow retries (async requests), making sure to - // propagate a RetriableCommitException if the failure is retriable. - future.completeExceptionally(commitExceptionForRetriableError(throwable)); - return; - } - if (isExpired(currentTimeMs)) { - // Fail requests that allowed retries (sync requests), but expired. - future.completeExceptionally(throwable); - return; - } - - // Enqueue request to be retried with backoff. Note that this maintains the same - // timer of the initial request, so all the retries are time-bounded. - onFailedAttempt(currentTimeMs); - pendingRequests.addOffsetCommitRequest(this); - } - @Override String requestDescription() { return "OffsetCommit request for offsets " + offsets; @@ -594,49 +746,15 @@ public class CommitRequestManager implements RequestManager, MemberStateListener return future; } - private boolean isExpired(final long currentTimeMs) { - return expirationTimeMs.isPresent() && expirationTimeMs.get() <= currentTimeMs; + void resetFuture() { + future = new CompletableFuture<>(); } - /** - * @return True if the requests allows to be retried (sync requests that provide an - * expiration time to bound the retries). False if the request does not allow to be - * retried on RetriableErrors (async requests that does not provide an expiration time - * for retries) - */ - private boolean allowsRetries() { - return expirationTimeMs.isPresent(); - } - - /** - * Complete the request future with a TimeoutException if the request expired. No action - * taken if the request is still active. - * - * @return True if the request expired. - */ - private boolean maybeExpire(final long currentTimeMs) { - if (isExpired(currentTimeMs)) { - future.completeExceptionally(new TimeoutException("OffsetCommit could not complete " + - "before timeout expired.")); - return true; + @Override + void removeRequest() { + if (!unsentOffsetCommitRequests().remove(this)) { + log.warn("OffsetCommit request to remove not found in the outbound buffer: {}", this); } - return false; - } - - /** - * @return A RetriableCommitFailedException for async commit requests if the original - * Exception was a RetriableException. Return the original one in any other case. - */ - private Throwable commitExceptionForRetriableError(Throwable throwable) { - if (!allowsRetries() && throwable instanceof RetriableException) - return new RetriableCommitFailedException(throwable); - return throwable; - } - - private Throwable commitExceptionForStaleMemberEpoch() { - if (retryOnStaleEpoch) - return new RetriableCommitFailedException(Errors.STALE_MEMBER_EPOCH.exception()); - return new CommitFailedException("OffsetCommit failed with stale member epoch." + Errors.STALE_MEMBER_EPOCH.message()); } } @@ -652,45 +770,34 @@ public class CommitRequestManager implements RequestManager, MemberStateListener final MemberInfo memberInfo; /** - * True if the request should be retried if it fails with {@link Errors#STALE_MEMBER_EPOCH}. + * Time until which the request should be retried if it fails with retriable + * errors. If not present, the request is triggered without waiting for a response or + * retrying. */ - boolean retryOnStaleEpoch; + private final Optional<Long> expirationTimeMs; + + /** + * True if the request expiration time has been reached. This is set when validating the + * request expiration on {@link #poll(long)} before sending it. It is used to know if a + * request should be retried on TimeoutException. + */ + boolean isExpired; RetriableRequestState(LogContext logContext, String owner, long retryBackoffMs, - long retryBackoffMaxMs, MemberInfo memberInfo, boolean retryOnStaleEpoch) { + long retryBackoffMaxMs, MemberInfo memberInfo, Optional<Long> expirationTimeMs) { super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); this.memberInfo = memberInfo; - this.retryOnStaleEpoch = retryOnStaleEpoch; + this.expirationTimeMs = expirationTimeMs; } // Visible for testing RetriableRequestState(LogContext logContext, String owner, long retryBackoffMs, int retryBackoffExpBase, - long retryBackoffMaxMs, double jitter, MemberInfo memberInfo, - boolean retryOnStaleEpoch) { + long retryBackoffMaxMs, double jitter, MemberInfo memberInfo, Optional<Long> expirationTimeMs) { super(logContext, owner, retryBackoffMs, retryBackoffExpBase, retryBackoffMaxMs, jitter); this.memberInfo = memberInfo; - this.retryOnStaleEpoch = retryOnStaleEpoch; - } - - /** - * Retry with backoff if the request failed with {@link Errors#STALE_MEMBER_EPOCH} and - * the member has valid epoch. - * - * @return True if the request has been enqueued to be retried with the latest member ID - * and epoch. - */ - boolean maybeRetryWithNewMemberEpoch(long currentTimeMs, Errors responseError) { - if (retryOnStaleEpoch && memberInfo.memberEpoch.isPresent()) { - // Request failed with invalid epoch, but the member has a valid one, so - // retry the request with the latest ID/epoch. - maybeRetry(currentTimeMs, responseError.exception()); - return true; - } - return false; + this.expirationTimeMs = expirationTimeMs; } - abstract void maybeRetry(long currentTimeMs, Throwable throwable); - /** * @return String containing the request name and arguments, to be used for logging * purposes. @@ -702,6 +809,19 @@ public class CommitRequestManager implements RequestManager, MemberStateListener */ abstract CompletableFuture<?> future(); + /** + * Complete the request future with a TimeoutException if the request timeout has been + * reached, based on the provided current time. + */ + void maybeExpire(long currentTimeMs) { + if (retryTimeoutExpired(currentTimeMs)) { + removeRequest(); + isExpired = true; + future().completeExceptionally(new TimeoutException(requestDescription() + + " could not complete before timeout expired.")); + } + } + /** * Build request with the given builder, including response handling logic. */ @@ -719,26 +839,29 @@ public class CommitRequestManager implements RequestManager, MemberStateListener private void handleClientResponse(final ClientResponse response, final Throwable error, - final long currentTimeMs) { + final long requestCompletionTimeMs) { try { if (error == null) { onResponse(response); } else { log.debug("{} completed with error", requestDescription(), error); - handleCoordinatorDisconnect(error, currentTimeMs); - if (error instanceof RetriableException) { - maybeRetry(currentTimeMs, error); - } else { - future().completeExceptionally(error); - } + onFailedAttempt(requestCompletionTimeMs); + handleCoordinatorDisconnect(error, requestCompletionTimeMs); + future().completeExceptionally(error); } } catch (Throwable t) { - log.error("Unexpected error handling response for ", requestDescription(), t); + log.error("Unexpected error handling response for {}", requestDescription(), t); future().completeExceptionally(t); } } abstract void onResponse(final ClientResponse response); + + boolean retryTimeoutExpired(long currentTimeMs) { + return expirationTimeMs.isPresent() && expirationTimeMs.get() <= currentTimeMs; + } + + abstract void removeRequest(); } class OffsetFetchRequestState extends RetriableRequestState { @@ -748,12 +871,11 @@ public class CommitRequestManager implements RequestManager, MemberStateListener */ public final Set<TopicPartition> requestedPartitions; - private final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; - /** - * Time until which the request should be retried if it fails with retriable errors. + * Future with the result of the request. This can be reset using {@link #resetFuture()} + * to get a new result when the request is retried. */ - private final long expirationTimeMs; + private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; public OffsetFetchRequestState(final Set<TopicPartition> partitions, final long retryBackoffMs, @@ -761,10 +883,9 @@ public class CommitRequestManager implements RequestManager, MemberStateListener final long expirationTimeMs, final MemberInfo memberInfo) { super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, - retryBackoffMaxMs, memberInfo, true); + retryBackoffMaxMs, memberInfo, Optional.of(expirationTimeMs)); this.requestedPartitions = partitions; this.future = new CompletableFuture<>(); - this.expirationTimeMs = expirationTimeMs; } public OffsetFetchRequestState(final Set<TopicPartition> partitions, @@ -774,10 +895,9 @@ public class CommitRequestManager implements RequestManager, MemberStateListener final double jitter, final MemberInfo memberInfo) { super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, 2, - retryBackoffMaxMs, jitter, memberInfo, true); + retryBackoffMaxMs, jitter, memberInfo, Optional.of(expirationTimeMs)); this.requestedPartitions = partitions; this.future = new CompletableFuture<>(); - this.expirationTimeMs = expirationTimeMs; } public boolean sameRequest(final OffsetFetchRequestState request) { @@ -829,28 +949,22 @@ public class CommitRequestManager implements RequestManager, MemberStateListener private void onFailure(final long currentTimeMs, final Errors responseError) { log.debug("Offset fetch failed: {}", responseError.message()); + onFailedAttempt(currentTimeMs); if (responseError == COORDINATOR_LOAD_IN_PROGRESS) { - maybeRetry(currentTimeMs, responseError.exception()); + future.completeExceptionally(responseError.exception()); } else if (responseError == Errors.UNKNOWN_MEMBER_ID) { log.error("OffsetFetch failed with {} because the member is not part of the group" + " anymore.", responseError); future.completeExceptionally(responseError.exception()); } else if (responseError == Errors.STALE_MEMBER_EPOCH) { - if (maybeRetryWithNewMemberEpoch(currentTimeMs, responseError)) { - log.debug("OffsetFetch failed with {} but the consumer is still part" + - " of the group, so the request will be retried with the latest " + - "member ID and epoch.", responseError); - return; - } log.error("OffsetFetch failed with {} and the consumer is not part " + "of the group anymore (it probably left the group, got fenced" + " or failed). The request cannot be retried and will fail.", responseError); future.completeExceptionally(responseError.exception()); - } else if (responseError == Errors.NOT_COORDINATOR - || responseError == Errors.COORDINATOR_NOT_AVAILABLE) { + } else if (responseError == Errors.NOT_COORDINATOR || responseError == Errors.COORDINATOR_NOT_AVAILABLE) { // Re-discover the coordinator and retry coordinatorRequestManager.markCoordinatorUnknown("error response " + responseError.name(), currentTimeMs); - maybeRetry(currentTimeMs, responseError.exception()); + future.completeExceptionally(responseError.exception()); } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); } else { @@ -860,21 +974,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener } } - /** - * Enqueue the request to be retried with exponential backoff if the time has not expired. - * This will fail the request future if the request is not retried. - */ - @Override - void maybeRetry(long currentTimeMs, Throwable throwable) { - if (isExpired(currentTimeMs)) { - future.completeExceptionally(throwable); - return; - } - onFailedAttempt(currentTimeMs); - pendingRequests.inflightOffsetFetches.remove(this); - pendingRequests.addOffsetFetchRequest(this); - } - @Override String requestDescription() { return "OffsetFetch request for partitions " + requestedPartitions; @@ -885,25 +984,23 @@ public class CommitRequestManager implements RequestManager, MemberStateListener return future; } - private boolean isExpired(final long currentTimeMs) { - return expirationTimeMs <= currentTimeMs; + void resetFuture() { + future = new CompletableFuture<>(); } - /** - * Complete the request future with a TimeoutException if the request expired. No action - * taken if the request is still active. - * - * @return True if the request expired. - */ - private boolean maybeExpire(final long currentTimeMs) { - if (isExpired(currentTimeMs)) { - future.completeExceptionally(new TimeoutException("OffsetFetch request could not " + - "complete before timeout expired.")); - return true; + @Override + void removeRequest() { + if (!unsentOffsetFetchRequests().remove(this)) { + log.warn("OffsetFetch request to remove not found in the outbound buffer: {}", this); } - return false; } + /** + * Handle OffsetFetch response that has no group level errors. This will look for + * partition level errors and fail the future accordingly, also recording a failed request + * attempt. If no partition level errors are found, this will complete the future with the + * offsets contained in the response, and record a successful request attempt. + */ private void onSuccess(final long currentTimeMs, final OffsetFetchResponse response) { Set<String> unauthorizedTopics = null; @@ -915,6 +1012,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener TopicPartition tp = entry.getKey(); OffsetFetchResponse.PartitionData partitionData = entry.getValue(); if (partitionData.hasError()) { + onFailedAttempt(currentTimeMs); Errors error = partitionData.error; log.debug("Failed to fetch offset for partition {}: {}", tp, error.message()); @@ -954,8 +1052,10 @@ public class CommitRequestManager implements RequestManager, MemberStateListener ", this could be either " + "transactional offsets waiting for completion, or " + "normal offsets waiting for replication after appending to local log", unstableTxnOffsetTopicPartitions); - maybeRetry(currentTimeMs, new UnstableOffsetCommitException("There are unstable offsets for the requested topic partitions")); + future.completeExceptionally(new UnstableOffsetCommitException("There are " + + "unstable offsets for the requested topic partitions")); } else { + onSuccessfulAttempt(currentTimeMs); future.complete(offsets); } } @@ -1003,51 +1103,16 @@ public class CommitRequestManager implements RequestManager, MemberStateListener return !unsentOffsetCommits.isEmpty() || !unsentOffsetFetches.isEmpty(); } - OffsetCommitRequestState addOffsetCommitRequest( - final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { - // TODO: Dedupe committing the same offsets to the same partitions - OffsetCommitRequestState requestState = createOffsetCommitRequest( - offsets, - jitter, - expirationTimeMs, - retryOnStaleEpoch); - return addOffsetCommitRequest(requestState); - } - + /** + * Add a commit request to the queue, so that it's sent out on the next call to + * {@link #poll(long)}. This is used from all commits (sync, async, auto-commit). + */ OffsetCommitRequestState addOffsetCommitRequest(OffsetCommitRequestState request) { log.debug("Enqueuing OffsetCommit request for offsets: {}", request.offsets); unsentOffsetCommits.add(request); return request; } - OffsetCommitRequestState createOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, - final OptionalDouble jitter, - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { - return jitter.isPresent() ? - new OffsetCommitRequestState( - offsets, - groupId, - groupInstanceId, - expirationTimeMs, - retryBackoffMs, - retryBackoffMaxMs, - jitter.getAsDouble(), - memberInfo, - retryOnStaleEpoch) : - new OffsetCommitRequestState( - offsets, - groupId, - groupInstanceId, - expirationTimeMs, - retryBackoffMs, - retryBackoffMaxMs, - memberInfo, - retryOnStaleEpoch); - } - /** * <p>Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future * to the existing one. @@ -1065,37 +1130,11 @@ public class CommitRequestManager implements RequestManager, MemberStateListener log.info("Duplicated OffsetFetchRequest: " + request.requestedPartitions); dupe.orElseGet(() -> inflight.get()).chainFuture(request.future); } else { - // remove the request from the outbound buffer: inflightOffsetFetches - request.future.whenComplete((r, t) -> { - if (!inflightOffsetFetches.remove(request)) { - log.warn("A duplicated, inflight, request was identified, but unable to find it in the " + - "outbound buffer:" + request); - } - }); this.unsentOffsetFetches.add(request); } return request.future; } - private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(final Set<TopicPartition> partitions, - final long expirationTimeMs) { - OffsetFetchRequestState request = jitter.isPresent() ? - new OffsetFetchRequestState( - partitions, - retryBackoffMs, - retryBackoffMaxMs, - expirationTimeMs, - jitter.getAsDouble(), - memberInfo) : - new OffsetFetchRequestState( - partitions, - retryBackoffMs, - retryBackoffMaxMs, - expirationTimeMs, - memberInfo); - return addOffsetFetchRequest(request); - } - /** * Clear {@code unsentOffsetCommits} and moves all the sendable request in {@code * unsentOffsetFetches} to the {@code inflightOffsetFetches} to bookkeep all the inflight @@ -1147,7 +1186,8 @@ public class CommitRequestManager implements RequestManager, MemberStateListener * futures with a TimeoutException. */ private void failAndRemoveExpiredCommitRequests(final long currentTimeMs) { - unsentOffsetCommits.removeIf(req -> req.maybeExpire(currentTimeMs)); + Queue<OffsetCommitRequestState> requestsToPurge = new LinkedList<>(unsentOffsetCommits); + requestsToPurge.forEach(req -> req.maybeExpire(currentTimeMs)); } /** @@ -1155,7 +1195,8 @@ public class CommitRequestManager implements RequestManager, MemberStateListener * futures with a TimeoutException. */ private void failAndRemoveExpiredFetchRequests(final long currentTimeMs) { - unsentOffsetFetches.removeIf(req -> req.maybeExpire(currentTimeMs)); + Queue<OffsetFetchRequestState> requestsToPurge = new LinkedList<>(unsentOffsetFetches); + requestsToPurge.forEach(req -> req.maybeExpire(currentTimeMs)); } private void clearAll() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index 6556495e55b..dd035506d4b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -837,12 +837,10 @@ public class MembershipManagerImpl implements MembershipManager { // best effort to commit the offsets in the case where the epoch might have changed while // the current reconciliation is in process. Note this is using the rebalance timeout as // it is the limit enforced by the broker to complete the reconciliation process. - commitResult = commitRequestManager.maybeAutoCommitAllConsumedNow( - Optional.of(getExpirationTimeForTimeout(rebalanceTimeoutMs)), - true); + commitResult = commitRequestManager.maybeAutoCommitSyncNow(getExpirationTimeForTimeout(rebalanceTimeoutMs)); // Execute commit -> onPartitionsRevoked -> onPartitionsAssigned. - commitResult.whenComplete((commitReqResult, commitReqError) -> { + commitResult.whenComplete((__, commitReqError) -> { if (commitReqError != null) { // The call to commit, that includes retry logic for retriable errors, failed to // complete within the time boundaries (fatal error or retriable that did not @@ -887,9 +885,6 @@ public class MembershipManagerImpl implements MembershipManager { revocationResult.thenCompose(__ -> { boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch; if (state == MemberState.RECONCILING && !memberHasRejoined) { - // Reschedule the auto commit starting from now that member has a new assignment. - commitRequestManager.resetAutoCommitTimer(); - // Apply assignment return assignPartitions(assignedTopicIdPartitions, addedPartitions); } else { @@ -916,6 +911,9 @@ public class MembershipManagerImpl implements MembershipManager { } else { boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch; if (state == MemberState.RECONCILING && !memberHasRejoined) { + // Reschedule the auto commit starting from now that the member has a new assignment. + commitRequestManager.resetAutoCommitTimer(); + // Make assignment effective on the broker by transitioning to send acknowledge. transitionTo(MemberState.ACKNOWLEDGING); } else { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 4396df27853..ac7ccc56c55 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -24,7 +24,7 @@ import java.util.Objects; public abstract class ApplicationEvent { public enum Type { - COMMIT, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, + COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, COMMIT_ON_CLOSE, LEAVE_ON_CLOSE diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 24a7acf39a9..9e48b4de6da 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -32,7 +32,6 @@ import org.slf4j.Logger; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -69,8 +68,12 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> @Override public void process(ApplicationEvent event) { switch (event.type()) { - case COMMIT: - process((CommitApplicationEvent) event); + case COMMIT_ASYNC: + process((AsyncCommitApplicationEvent) event); + return; + + case COMMIT_SYNC: + process((SyncCommitApplicationEvent) event); return; case POLL: @@ -139,18 +142,17 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> requestManagers.heartbeatRequestManager.ifPresent(hrm -> hrm.resetPollTimer(event.pollTimeMs())); } - private void process(final CommitApplicationEvent event) { - if (!requestManagers.commitRequestManager.isPresent()) { - // Leaving this error handling here, but it is a bit strange as the commit API should enforce the group.id - // upfront, so we should never get to this block. - Exception exception = new KafkaException("Unable to commit offset. Most likely because the group.id wasn't set"); - event.future().completeExceptionally(exception); - return; - } + private void process(final AsyncCommitApplicationEvent event) { + CommitRequestManager manager = requestManagers.commitRequestManager.get(); + CompletableFuture<Void> commitResult = manager.commitAsync(event.offsets()); + event.chain(commitResult); + } + private void process(final SyncCommitApplicationEvent event) { CommitRequestManager manager = requestManagers.commitRequestManager.get(); - Optional<Long> expirationTimeMs = event.retryTimeoutMs().map(this::getExpirationTimeForTimeout); - event.chain(manager.addOffsetCommitRequest(event.offsets(), expirationTimeMs, false)); + long expirationTimeoutMs = getExpirationTimeForTimeout(event.retryTimeoutMs()); + CompletableFuture<Void> commitResult = manager.commitSync(event.offsets(), expirationTimeoutMs); + event.chain(commitResult); } private void process(final FetchCommittedOffsetsApplicationEvent event) { @@ -161,7 +163,7 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> } CommitRequestManager manager = requestManagers.commitRequestManager.get(); long expirationTimeMs = getExpirationTimeForTimeout(event.timeout()); - event.chain(manager.addOffsetFetchRequest(event.partitions(), expirationTimeMs)); + event.chain(manager.fetchOffsets(event.partitions(), expirationTimeMs)); } private void process(final NewTopicsMetadataUpdateRequestEvent ignored) { @@ -179,7 +181,7 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> } CommitRequestManager manager = requestManagers.commitRequestManager.get(); manager.updateAutoCommitTimer(event.currentTimeMs()); - manager.maybeAutoCommitAllConsumedAsync(); + manager.maybeAutoCommitAsync(); } private void process(final ListOffsetsApplicationEvent event) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java new file mode 100644 index 00000000000..7a939ce3cfd --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import java.util.Map; + +/** + * Event to commit offsets without waiting for a response, so the request won't be retried. + */ +public class AsyncCommitApplicationEvent extends CommitApplicationEvent { + + public AsyncCommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets) { + super(offsets, Type.COMMIT_ASYNC); + } + + @Override + public String toString() { + return "AsyncCommitApplicationEvent{" + + toStringBase() + + ", offsets=" + offsets() + + '}'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java index d2205227c49..69d969d7b0f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java @@ -21,32 +21,17 @@ import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.Map; -import java.util.Optional; -public class CommitApplicationEvent extends CompletableApplicationEvent<Void> { +public abstract class CommitApplicationEvent extends CompletableApplicationEvent<Void> { /** * Offsets to commit per partition. */ private final Map<TopicPartition, OffsetAndMetadata> offsets; - /** - * Time to wait for a response, retrying on retriable errors. If not present, the request is - * triggered without waiting for a response or being retried. - */ - private final Optional<Long> retryTimeoutMs; - - /** - * Create new event to commit offsets. If timer is present, the request will be retried on - * retriable errors until the timer expires (sync commit offsets request). If the timer is - * not present, the request will be sent without waiting for a response of retrying (async - * commit offsets request). - */ - public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> retryTimeoutMs) { - super(Type.COMMIT); + public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, Type type) { + super(type); this.offsets = Collections.unmodifiableMap(offsets); - this.retryTimeoutMs = retryTimeoutMs; for (OffsetAndMetadata offsetAndMetadata : offsets.values()) { if (offsetAndMetadata.offset() < 0) { @@ -59,10 +44,6 @@ public class CommitApplicationEvent extends CompletableApplicationEvent<Void> { return offsets; } - public Optional<Long> retryTimeoutMs() { - return retryTimeoutMs; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -80,13 +61,4 @@ public class CommitApplicationEvent extends CompletableApplicationEvent<Void> { result = 31 * result + offsets.hashCode(); return result; } - - @Override - public String toString() { - return "CommitApplicationEvent{" + - toStringBase() + - ", offsets=" + offsets + - ", retryTimeout=" + (retryTimeoutMs.map(t -> t + "ms").orElse("none")) + - '}'; - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java new file mode 100644 index 00000000000..43dfee6ab18 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import java.util.Map; + +/** + * Event to commit offsets waiting for a response and retrying on expected retriable errors until + * the timer expires. + */ +public class SyncCommitApplicationEvent extends CommitApplicationEvent { + + /** + * Time to wait for a response, retrying on retriable errors. + */ + private final long retryTimeoutMs; + + public SyncCommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, + final long retryTimeoutMs) { + super(offsets, Type.COMMIT_SYNC); + this.retryTimeoutMs = retryTimeoutMs; + } + + public Long retryTimeoutMs() { + return retryTimeoutMs; + } + + @Override + public String toString() { + return "SyncCommitApplicationEvent{" + + toStringBase() + + ", offsets=" + offsets() + + ", retryTimeout=" + retryTimeoutMs + "ms" + + '}'; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 57546951982..2db666e95e9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -32,8 +32,8 @@ import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; -import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; @@ -48,6 +48,7 @@ import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdat import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; import org.apache.kafka.common.KafkaException; @@ -252,9 +253,9 @@ public class AsyncKafkaConsumerTest { consumer.commitAsync(offsets, null); - final ArgumentCaptor<CommitApplicationEvent> commitEventCaptor = ArgumentCaptor.forClass(CommitApplicationEvent.class); + final ArgumentCaptor<AsyncCommitApplicationEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitApplicationEvent.class); verify(applicationEventHandler).add(commitEventCaptor.capture()); - final CommitApplicationEvent commitEvent = commitEventCaptor.getValue(); + final AsyncCommitApplicationEvent commitEvent = commitEventCaptor.getValue(); assertEquals(offsets, commitEvent.offsets()); assertDoesNotThrow(() -> commitEvent.future().complete(null)); assertDoesNotThrow(() -> consumer.commitAsync(offsets, null)); @@ -266,7 +267,7 @@ public class AsyncKafkaConsumerTest { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); - completeCommitApplicationEventSuccessfully(); + completeCommitAsyncApplicationEventSuccessfully(); MockCommitCallback callback = new MockCommitCallback(); assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); @@ -283,7 +284,7 @@ public class AsyncKafkaConsumerTest { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); - completeCommitApplicationEventExceptionally(exception); + completeCommitAsyncApplicationEventExceptionally(exception); MockCommitCallback callback = new MockCommitCallback(); assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); @@ -306,9 +307,9 @@ public class AsyncKafkaConsumerTest { assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); - final ArgumentCaptor<CommitApplicationEvent> commitEventCaptor = ArgumentCaptor.forClass(CommitApplicationEvent.class); + final ArgumentCaptor<AsyncCommitApplicationEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitApplicationEvent.class); verify(applicationEventHandler).add(commitEventCaptor.capture()); - final CommitApplicationEvent commitEvent = commitEventCaptor.getValue(); + final AsyncCommitApplicationEvent commitEvent = commitEventCaptor.getValue(); commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception()); assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> consumer.commitAsync()); @@ -433,7 +434,7 @@ public class AsyncKafkaConsumerTest { sortedPartitions.add(tp); CompletableBackgroundEvent<Void> e = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, sortedPartitions); backgroundEventQueue.add(e); - completeCommitApplicationEventSuccessfully(); + completeCommitSyncApplicationEventSuccessfully(); ConsumerRebalanceListener listener = new ConsumerRebalanceListener() { @Override @@ -478,7 +479,7 @@ public class AsyncKafkaConsumerTest { consumer = newConsumer(); final String currentThread = Thread.currentThread().getName(); MockCommitCallback callback = new MockCommitCallback(); - completeCommitApplicationEventSuccessfully(); + completeCommitAsyncApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); forceCommitCallbackInvocation(); @@ -515,7 +516,7 @@ public class AsyncKafkaConsumerTest { HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>(); topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), "")); topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), "")); - completeCommitApplicationEventSuccessfully(); + completeCommitSyncApplicationEventSuccessfully(); consumer.assign(Arrays.asList(t0, t1)); @@ -523,7 +524,7 @@ public class AsyncKafkaConsumerTest { verify(metadata).updateLastSeenEpochIfNewer(t0, 2); verify(metadata).updateLastSeenEpochIfNewer(t1, 1); - verify(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); + verify(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class)); } @Test @@ -557,14 +558,14 @@ public class AsyncKafkaConsumerTest { verify(metadata).updateLastSeenEpochIfNewer(t0, 2); verify(metadata).updateLastSeenEpochIfNewer(t1, 1); - verify(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); + verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class)); } @Test public void testEnsurePollExecutedCommitAsyncCallbacks() { consumer = newConsumer(); MockCommitCallback callback = new MockCommitCallback(); - completeCommitApplicationEventSuccessfully(); + completeCommitAsyncApplicationEventSuccessfully(); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); @@ -579,7 +580,7 @@ public class AsyncKafkaConsumerTest { public void testEnsureShutdownExecutedCommitAsyncCallbacks() { consumer = newConsumer(); MockCommitCallback callback = new MockCommitCallback(); - completeCommitApplicationEventSuccessfully(); + completeCommitAsyncApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); assertMockCommitCallbackInvoked(() -> consumer.close(), callback, @@ -670,7 +671,7 @@ public class AsyncKafkaConsumerTest { subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); subscriptions.seek(new TopicPartition("topic", 0), 100); consumer.maybeAutoCommitSync(true, time.timer(100), null); - verify(applicationEventHandler).add(any(CommitApplicationEvent.class)); + verify(applicationEventHandler).add(any(SyncCommitApplicationEvent.class)); } @Test @@ -688,7 +689,7 @@ public class AsyncKafkaConsumerTest { subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); subscriptions.seek(new TopicPartition("topic", 0), 100); consumer.maybeAutoCommitSync(false, time.timer(100), null); - verify(applicationEventHandler, never()).add(any(CommitApplicationEvent.class)); + verify(applicationEventHandler, never()).add(any(SyncCommitApplicationEvent.class)); } private void assertMockCommitCallbackInvoked(final Executable task, @@ -893,7 +894,7 @@ public class AsyncKafkaConsumerTest { consumer = newConsumer(props); assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); - completeCommitApplicationEventSuccessfully(); + completeCommitSyncApplicationEventSuccessfully(); consumer.close(Duration.ZERO); @@ -909,7 +910,7 @@ public class AsyncKafkaConsumerTest { consumer = newConsumer(props); assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); - completeCommitApplicationEventSuccessfully(); + completeCommitSyncApplicationEventSuccessfully(); consumer.commitSync(mockTopicPartitionOffset()); @@ -925,7 +926,7 @@ public class AsyncKafkaConsumerTest { consumer = newConsumer(props); assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); KafkaException expected = new KafkaException("Test exception"); - completeCommitApplicationEventExceptionally(expected); + completeCommitSyncApplicationEventExceptionally(expected); KafkaException actual = assertThrows(KafkaException.class, () -> consumer.commitSync(mockTopicPartitionOffset())); assertEquals(expected, actual); @@ -941,7 +942,7 @@ public class AsyncKafkaConsumerTest { consumer = newConsumer(props); assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); - completeCommitApplicationEventSuccessfully(); + completeCommitAsyncApplicationEventSuccessfully(); consumer.commitAsync(mockTopicPartitionOffset(), new MockCommitCallback()); assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); @@ -957,7 +958,7 @@ public class AsyncKafkaConsumerTest { consumer = newConsumer(props); assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); - completeCommitApplicationEventExceptionally(new KafkaException("Test exception")); + completeCommitAsyncApplicationEventExceptionally(new KafkaException("Test exception")); consumer.commitAsync(mockTopicPartitionOffset(), new MockCommitCallback()); assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); @@ -1542,20 +1543,36 @@ public class AsyncKafkaConsumerTest { return timestampToSearch; } - private void completeCommitApplicationEventExceptionally(Exception ex) { + private void completeCommitAsyncApplicationEventExceptionally(Exception ex) { doAnswer(invocation -> { - CommitApplicationEvent event = invocation.getArgument(0); + AsyncCommitApplicationEvent event = invocation.getArgument(0); event.future().completeExceptionally(ex); return null; - }).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); + }).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class)); } - private void completeCommitApplicationEventSuccessfully() { + private void completeCommitSyncApplicationEventExceptionally(Exception ex) { doAnswer(invocation -> { - CommitApplicationEvent event = invocation.getArgument(0); + SyncCommitApplicationEvent event = invocation.getArgument(0); + event.future().completeExceptionally(ex); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class)); + } + + private void completeCommitAsyncApplicationEventSuccessfully() { + doAnswer(invocation -> { + AsyncCommitApplicationEvent event = invocation.getArgument(0); + event.future().complete(null); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class)); + } + + private void completeCommitSyncApplicationEventSuccessfully() { + doAnswer(invocation -> { + SyncCommitApplicationEvent event = invocation.getArgument(0); event.future().complete(null); return null; - }).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); + }).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class)); } private void completeFetchedCommittedOffsetApplicationEventSuccessfully(final Map<TopicPartition, OffsetAndMetadata> committedOffsets) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 17745fa8660..c27494d69a0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -63,6 +63,7 @@ import java.util.OptionalDouble; import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -78,13 +79,16 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -126,7 +130,7 @@ public class CommitRequestManagerTest { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0)); - commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false); + commitRequestManger.commitAsync(offsets); assertPoll(false, 0, commitRequestManger); } @@ -137,7 +141,7 @@ public class CommitRequestManagerTest { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0)); - commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false); + commitRequestManger.commitAsync(offsets); assertPoll(1, commitRequestManger); } @@ -177,13 +181,11 @@ public class CommitRequestManagerTest { offsets2.put(new TopicPartition("test", 4), new OffsetAndMetadata(20L)); // Add the requests to the CommitRequestManager and store their futures - ArrayList<CompletableFuture<Void>> commitFutures = new ArrayList<>(); - ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> fetchFutures = new ArrayList<>(); long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - commitFutures.add(commitManager.addOffsetCommitRequest(offsets1, Optional.of(expirationTimeMs), false)); - fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 0)), expirationTimeMs)); - commitFutures.add(commitManager.addOffsetCommitRequest(offsets2, Optional.of(expirationTimeMs), false)); - fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 1)), expirationTimeMs)); + commitManager.commitSync(offsets1, expirationTimeMs); + commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), expirationTimeMs); + commitManager.commitSync(offsets2, expirationTimeMs); + commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 1)), expirationTimeMs); // Poll the CommitRequestManager and verify that the inflightOffsetFetches size is correct NetworkClientDelegate.PollResult result = commitManager.poll(time.milliseconds()); @@ -195,9 +197,16 @@ public class CommitRequestManagerTest { assertFalse(commitManager.pendingRequests.hasUnsentRequests()); assertEquals(2, commitManager.pendingRequests.inflightOffsetFetches.size()); + // Complete requests with a response + result.unsentRequests.forEach(req -> { + if (req.requestBuilder() instanceof OffsetFetchRequest.Builder) { + req.handler().onComplete(buildOffsetFetchClientResponse(req, Collections.emptySet(), Errors.NONE)); + } else { + req.handler().onComplete(buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap<>()))); + } + }); + // Verify that the inflight offset fetch requests have been removed from the pending request buffer - commitFutures.forEach(f -> f.complete(null)); - fetchFutures.forEach(f -> f.complete(null)); assertEquals(0, commitManager.pendingRequests.inflightOffsetFetches.size()); } @@ -208,7 +217,7 @@ public class CommitRequestManagerTest { Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap( new TopicPartition("topic", 1), new OffsetAndMetadata(0)); - commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false); + commitRequestManger.commitAsync(offsets); assertEquals(1, commitRequestManger.unsentOffsetCommitRequests().size()); assertEquals(1, commitRequestManger.poll(time.milliseconds()).unsentRequests.size()); assertTrue(commitRequestManger.unsentOffsetCommitRequests().isEmpty()); @@ -255,24 +264,19 @@ public class CommitRequestManagerTest { Errors.NONE)); } - // This is the case of the sync auto commit sent when the consumer is being closed (sync commit - // that should be retried until it succeeds, fails, or timer expires). + // This is the case of the sync commit triggered from an API call to commitSync or when the + // consumer is being closed. It should be retried until it succeeds, fails, or timer expires. @ParameterizedTest @MethodSource("offsetCommitExceptionSupplier") - public void testAutoCommitSyncRetriedAfterExpectedRetriableException(Errors error) { - long commitInterval = retryBackoffMs * 2; - CommitRequestManager commitRequestManger = create(true, commitInterval); - TopicPartition tp = new TopicPartition("topic", 1); - subscriptionState.assignFromUser(Collections.singleton(tp)); - subscriptionState.seek(tp, 100); + public void testCommitSyncRetriedAfterExpectedRetriableException(Errors error) { + CommitRequestManager commitRequestManger = create(false, 100); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); - time.sleep(commitInterval); - commitRequestManger.updateAutoCommitTimer(time.milliseconds()); - // Auto-commit all consume sync (ex. triggered when the consumer is closed). + Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap( + new TopicPartition("topic", 1), + new OffsetAndMetadata(0)); long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - CompletableFuture<Void> commitResult = - commitRequestManger.maybeAutoCommitAllConsumedNow(Optional.of(expirationTimeMs), false); + CompletableFuture<Void> commitResult = commitRequestManger.commitSync(offsets, expirationTimeMs); sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManger, error, commitResult); // We expect that request should have been retried on this sync commit. @@ -292,7 +296,7 @@ public class CommitRequestManagerTest { // Send sync offset commit that fails and verify it propagates the expected exception. Long expirationTimeMs = time.milliseconds() + retryBackoffMs; - CompletableFuture<Void> commitResult = commitRequestManger.addOffsetCommitRequest(offsets, Optional.of(expirationTimeMs), false); + CompletableFuture<Void> commitResult = commitRequestManger.commitSync(offsets, expirationTimeMs); completeOffsetCommitRequestWithError(commitRequestManger, commitError); assertFutureThrows(commitResult, expectedException); } @@ -309,36 +313,25 @@ public class CommitRequestManagerTest { } @Test - public void testOffsetCommitFailsWithCommitFailedExceptionIfUnknownMemberId() { - long commitInterval = retryBackoffMs * 2; - CommitRequestManager commitRequestManger = create(true, commitInterval); - TopicPartition tp = new TopicPartition("topic", 1); - subscriptionState.assignFromUser(Collections.singleton(tp)); - subscriptionState.seek(tp, 100); + public void testCommitSyncFailsWithCommitFailedExceptionIfUnknownMemberId() { + CommitRequestManager commitRequestManger = create(false, 100); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); - time.sleep(commitInterval); - commitRequestManger.updateAutoCommitTimer(time.milliseconds()); - // Auto-commit all consume sync (ex. triggered when the consumer is closed). + Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap( + new TopicPartition("topic", 1), + new OffsetAndMetadata(0)); long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - CompletableFuture<Void> commitResult = - commitRequestManger.maybeAutoCommitAllConsumedNow(Optional.of(expirationTimeMs), false); + CompletableFuture<Void> commitResult = commitRequestManger.commitSync(offsets, expirationTimeMs); completeOffsetCommitRequestWithError(commitRequestManger, Errors.UNKNOWN_MEMBER_ID); NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); assertEquals(0, res.unsentRequests.size()); - // Commit should fail with CommitFailedException assertTrue(commitResult.isDone()); assertFutureThrows(commitResult, CommitFailedException.class); } - /** - * This is the case where a request to commit offsets is performed without retrying on - * STALE_MEMBER_EPOCH (ex. commits triggered from the consumer API, auto-commits triggered on - * the interval). The expectation is that the request should fail with CommitFailedException. - */ @Test - public void testOffsetCommitFailsWithCommitFailedExceptionIfStaleMemberEpochNotRetried() { + public void testCommitSyncFailsWithCommitFailedExceptionOnStaleMemberEpoch() { CommitRequestManager commitRequestManger = create(true, 100); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); @@ -347,8 +340,8 @@ public class CommitRequestManagerTest { new OffsetAndMetadata(0)); // Send commit request expected to be retried on retriable errors - CompletableFuture<Void> commitResult = commitRequestManger.addOffsetCommitRequest( - offsets, Optional.of(time.milliseconds() + defaultApiTimeoutMs), false); + CompletableFuture<Void> commitResult = commitRequestManger.commitSync( + offsets, time.milliseconds() + defaultApiTimeoutMs); completeOffsetCommitRequestWithError(commitRequestManger, Errors.STALE_MEMBER_EPOCH); NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); assertEquals(0, res.unsentRequests.size()); @@ -359,29 +352,35 @@ public class CommitRequestManagerTest { } /** - * This is the case of the async auto commit request triggered on the interval. The - * expectation is that the request should fail with RetriableCommitFailedException, so that - * the timer is reset and the request is retried on the next interval. + * This is the case of the async auto commit request triggered on the interval. The request + * internally fails with the fatal stale epoch error, and the expectation is that it just + * resets the commit timer to the interval, to attempt again when the interval expires. */ @Test - public void testCommitAsyncFailsWithRetriableOnStaleMemberEpoch() { - CommitRequestManager commitRequestManger = create(false, 100); + public void testAutoCommitAsyncFailsWithStaleMemberEpochContinuesToCommitOnTheInterval() { + CommitRequestManager commitRequestManger = create(true, 100); + time.sleep(100); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + TopicPartition t1p = new TopicPartition("topic1", 0); + subscriptionState.assignFromUser(singleton(t1p)); + subscriptionState.seek(t1p, 10); - Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap( - new TopicPartition("topic", 1), - new OffsetAndMetadata(0)); - - // Async commit that won't be retried. - CompletableFuture<Void> commitResult = commitRequestManger.addOffsetCommitRequest( - offsets, Optional.empty(), true); + // Async commit on the interval fails with fatal stale epoch and just resets the timer to + // the interval + commitRequestManger.maybeAutoCommitAsync(); completeOffsetCommitRequestWithError(commitRequestManger, Errors.STALE_MEMBER_EPOCH); + verify(commitRequestManger).resetAutoCommitTimer(); + + // Async commit retried, only when the interval expires NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); - assertEquals(0, res.unsentRequests.size()); + assertEquals(0, res.unsentRequests.size(), "No request should be generated until the " + + "interval expires"); + time.sleep(100); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + res = commitRequestManger.poll(time.milliseconds()); + assertEquals(1, res.unsentRequests.size()); - // Commit should fail with RetriableCommitFailedException. - assertTrue(commitResult.isDone()); - assertFutureThrows(commitResult, RetriableCommitFailedException.class); } @Test @@ -394,8 +393,7 @@ public class CommitRequestManagerTest { new OffsetAndMetadata(0)); // Async commit that won't be retried. - CompletableFuture<Void> commitResult = commitRequestManager.addOffsetCommitRequest( - offsets, Optional.empty(), true); + CompletableFuture<Void> commitResult = commitRequestManager.commitAsync(offsets); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); @@ -473,11 +471,10 @@ public class CommitRequestManagerTest { CommitRequestManager commitRequestManger = create(true, 100); time.sleep(100); commitRequestManger.updateAutoCommitTimer(time.milliseconds()); - CompletableFuture<Void> result = commitRequestManger.maybeAutoCommitAllConsumedAsync(); - + // CompletableFuture<Void> result = commitRequestManger.maybeAutoCommitAllConsumedAsync(); + commitRequestManger.maybeAutoCommitAsync(); assertTrue(commitRequestManger.pendingRequests.unsentOffsetCommits.isEmpty()); - assertTrue(result.isDone()); - assertFalse(result.isCompletedExceptionally()); + verify(commitRequestManger).resetAutoCommitTimer(); } @Test @@ -489,18 +486,17 @@ public class CommitRequestManagerTest { // Auto-commit of empty offsets time.sleep(100); commitRequestManger.updateAutoCommitTimer(time.milliseconds()); - CompletableFuture<Void> result = commitRequestManger.maybeAutoCommitAllConsumedAsync(); - assertTrue(result.isDone()); - assertFalse(result.isCompletedExceptionally()); + commitRequestManger.maybeAutoCommitAsync(); // Next auto-commit consumed offsets (not empty). Should generate a request, ensuring // that the previous auto-commit of empty did not leave the inflight request flag on subscriptionState.seek(t1p, 100); time.sleep(100); commitRequestManger.updateAutoCommitTimer(time.milliseconds()); - result = commitRequestManger.maybeAutoCommitAllConsumedAsync(); - assertFalse(commitRequestManger.pendingRequests.unsentOffsetCommits.isEmpty()); - assertFalse(result.isDone()); + commitRequestManger.maybeAutoCommitAsync(); + assertEquals(1, commitRequestManger.pendingRequests.unsentOffsetCommits.size()); + + verify(commitRequestManger, times(2)).resetAutoCommitTimer(); } @Test @@ -534,7 +530,7 @@ public class CommitRequestManagerTest { List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = sendAndVerifyDuplicatedOffsetFetchRequests( commitRequestManger, partitions, - 5, + 1, error); // we only want to make sure to purge the outbound buffer for non-retriables, so retriable will be re-queued. if (isRetriable) @@ -545,6 +541,49 @@ public class CommitRequestManagerTest { } } + @Test + public void testSuccessfulOffsetFetch() { + CommitRequestManager commitManager = create(false, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + + long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult = + commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), + expirationTimeMs); + + // Send fetch request + NetworkClientDelegate.PollResult result = commitManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + assertEquals(1, commitManager.pendingRequests.inflightOffsetFetches.size()); + assertFalse(fetchResult.isDone()); + + // Complete request with a response + TopicPartition tp = new TopicPartition("topic1", 0); + long expectedOffset = 100; + NetworkClientDelegate.UnsentRequest req = result.unsentRequests.get(0); + Map<TopicPartition, OffsetFetchResponse.PartitionData> topicPartitionData = + Collections.singletonMap( + tp, + new OffsetFetchResponse.PartitionData(expectedOffset, Optional.of(1), "", Errors.NONE)); + req.handler().onComplete(buildOffsetFetchClientResponse(req, topicPartitionData, Errors.NONE, false)); + + // Validate request future completes with the response received + assertTrue(fetchResult.isDone()); + assertFalse(fetchResult.isCompletedExceptionally()); + Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = null; + try { + offsetsAndMetadata = fetchResult.get(); + } catch (InterruptedException | ExecutionException e) { + fail(e); + } + assertNotNull(offsetsAndMetadata); + assertEquals(1, offsetsAndMetadata.size()); + assertTrue(offsetsAndMetadata.containsKey(tp)); + assertEquals(expectedOffset, offsetsAndMetadata.get(tp).offset()); + assertEquals(0, commitManager.pendingRequests.inflightOffsetFetches.size(), "Inflight " + + "request should be removed from the queue when a response is received."); + } + @ParameterizedTest @MethodSource("offsetFetchRetriableCoordinatorErrors") public void testOffsetFetchMarksCoordinatorUnknownOnRetriableCoordinatorErrors(Errors error, @@ -556,7 +595,7 @@ public class CommitRequestManagerTest { partitions.add(new TopicPartition("t1", 0)); long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = commitRequestManager.addOffsetFetchRequest(partitions, expirationTimeMs); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = commitRequestManager.fetchOffsets(partitions, expirationTimeMs); completeOffsetFetchRequestWithError(commitRequestManager, partitions, error); @@ -583,7 +622,7 @@ public class CommitRequestManagerTest { partitions.add(new TopicPartition("t1", 0)); long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = commitRequestManger.addOffsetFetchRequest(partitions, expirationTimeMs); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = commitRequestManger.fetchOffsets(partitions, expirationTimeMs); NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); @@ -595,8 +634,7 @@ public class CommitRequestManagerTest { assertFalse(result.isDone()); assertCoordinatorDisconnect(); - // Request should be retried after backoff expires - time.sleep(100); + time.sleep(retryBackoffMs); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); res = commitRequestManger.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); @@ -612,8 +650,8 @@ public class CommitRequestManagerTest { Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0)); - // Send commit request without expiration (async commit not expected to be retried). - CompletableFuture<Void> commitResult = commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false); + // Send async commit (not expected to be retried). + CompletableFuture<Void> commitResult = commitRequestManger.commitAsync(offsets); completeOffsetCommitRequestWithError(commitRequestManger, error); NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); assertEquals(0, res.unsentRequests.size()); @@ -638,7 +676,7 @@ public class CommitRequestManagerTest { // Send sync offset commit request that fails with retriable error. Long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; - CompletableFuture<Void> commitResult = commitRequestManger.addOffsetCommitRequest(offsets, Optional.of(expirationTimeMs), false); + CompletableFuture<Void> commitResult = commitRequestManger.commitSync(offsets, expirationTimeMs); completeOffsetCommitRequestWithError(commitRequestManger, Errors.REQUEST_TIMED_OUT); // Request retried after backoff, and fails with retriable again. Should not complete yet @@ -670,7 +708,7 @@ public class CommitRequestManagerTest { // Send offset commit request that fails with retriable error. long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; - CompletableFuture<Void> commitResult = commitRequestManger.addOffsetCommitRequest(offsets, Optional.of(expirationTimeMs), false); + CompletableFuture<Void> commitResult = commitRequestManger.commitSync(offsets, expirationTimeMs); completeOffsetCommitRequestWithError(commitRequestManger, Errors.COORDINATOR_NOT_AVAILABLE); // Sleep to expire the request timeout. Request should fail on the next poll with a @@ -696,7 +734,7 @@ public class CommitRequestManagerTest { // Send async commit request that fails with retriable error (not expected to be retried). Errors retriableError = Errors.COORDINATOR_NOT_AVAILABLE; - CompletableFuture<Void> commitResult = commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false); + CompletableFuture<Void> commitResult = commitRequestManger.commitAsync(offsets); completeOffsetCommitRequestWithError(commitRequestManger, retriableError); NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); assertEquals(0, res.unsentRequests.size()); @@ -710,7 +748,6 @@ public class CommitRequestManagerTest { assertFutureThrows(commitResult, RetriableCommitFailedException.class); } - @Test public void testEnsureBackoffRetryOnOffsetCommitRequestTimeout() { CommitRequestManager commitRequestManger = create(true, 100); @@ -720,7 +757,7 @@ public class CommitRequestManagerTest { new OffsetAndMetadata(0)); long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - commitRequestManger.addOffsetCommitRequest(offsets, Optional.of(expirationTimeMs), false); + commitRequestManger.commitSync(offsets, expirationTimeMs); NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException()); @@ -802,7 +839,7 @@ public class CommitRequestManagerTest { // Send request that is expected to fail with invalid epoch. long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - commitRequestManager.addOffsetFetchRequest(partitions, expirationTimeMs); + commitRequestManager.fetchOffsets(partitions, expirationTimeMs); // Mock member has new a valid epoch. int newEpoch = 8; @@ -842,7 +879,7 @@ public class CommitRequestManagerTest { // Send request that is expected to fail with invalid epoch. long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestResult = - commitRequestManager.addOffsetFetchRequest(partitions, expirationTimeMs); + commitRequestManager.fetchOffsets(partitions, expirationTimeMs); // Mock member not having a valid epoch anymore (left/failed/fenced). commitRequestManager.onMemberEpochUpdated(Optional.empty(), Optional.empty()); @@ -862,40 +899,58 @@ public class CommitRequestManagerTest { // STALE_MEMBER_EPOCH (ex. triggered from the reconciliation process), fails with invalid // member epoch. The expectation is that if the member already has a new epoch, the request // should be retried with the new epoch. - @Test - public void testOffsetCommitFailsWithStaleEpochAndRetriesWithNewEpoch() { - CommitRequestManager commitRequestManager = create(true, 100); + @ParameterizedTest + @MethodSource("offsetCommitExceptionSupplier") + public void testAutoCommitSyncBeforeRevocationRetriesOnRetriableAndStaleEpoch(Errors error) { + // Enable auto-commit but with very long interval to avoid triggering auto-commits on the + // interval and just test the auto-commits triggered before revocation + CommitRequestManager commitRequestManager = create(true, Integer.MAX_VALUE); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); - Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), - new OffsetAndMetadata(0)); + TopicPartition tp = new TopicPartition("topic", 1); + subscriptionState.assignFromUser(singleton(tp)); + subscriptionState.seek(tp, 5); + long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2; - // Send commit request expected to be retried on STALE_MEMBER_EPOCH error while it does not expire. - long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; - commitRequestManager.addOffsetCommitRequest(offsets, Optional.of(expirationTimeMs), true); + // Send commit request expected to be retried on STALE_MEMBER_EPOCH error while it does not expire + commitRequestManager.maybeAutoCommitSyncNow(expirationTimeMs); - // Mock member has new a valid epoch. int newEpoch = 8; String memberId = "member1"; - commitRequestManager.onMemberEpochUpdated(Optional.of(newEpoch), Optional.of(memberId)); - - completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH); - - // Check that the request that failed was removed from the inflight requests buffer. - assertEquals(0, commitRequestManager.pendingRequests.inflightOffsetFetches.size()); - assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetCommits.size()); - - // Request should be retried with backoff. - NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); - assertEquals(0, res.unsentRequests.size()); - time.sleep(retryBackoffMs); - res = commitRequestManager.poll(time.milliseconds()); - assertEquals(1, res.unsentRequests.size()); + if (error == Errors.STALE_MEMBER_EPOCH) { + // Mock member has new a valid epoch + commitRequestManager.onMemberEpochUpdated(Optional.of(newEpoch), Optional.of(memberId)); + } - // The retried request should include the latest member ID and epoch. - OffsetCommitRequestData reqData = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data(); - assertEquals(newEpoch, reqData.generationIdOrMemberEpoch()); - assertEquals(memberId, reqData.memberId()); + completeOffsetCommitRequestWithError(commitRequestManager, error); + + if (error.exception() instanceof RetriableException || error == Errors.STALE_MEMBER_EPOCH) { + assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetCommits.size(), + "Request to be retried should be added to the outbound queue"); + + // Request should be retried with backoff + NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); + assertEquals(0, res.unsentRequests.size()); + time.sleep(retryBackoffMs); + res = commitRequestManager.poll(time.milliseconds()); + assertEquals(1, res.unsentRequests.size()); + if (error == Errors.STALE_MEMBER_EPOCH) { + // The retried request should include the latest member ID and epoch + OffsetCommitRequestData reqData = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data(); + assertEquals(newEpoch, reqData.generationIdOrMemberEpoch()); + assertEquals(memberId, reqData.memberId()); + } + } else { + assertEquals(0, commitRequestManager.pendingRequests.unsentOffsetCommits.size(), + "Non-retriable failed request should be removed from the outbound queue"); + + // Request should not be retried, even after the backoff expires + NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); + assertEquals(0, res.unsentRequests.size()); + time.sleep(retryBackoffMs); + res = commitRequestManager.poll(time.milliseconds()); + assertEquals(0, res.unsentRequests.size()); + } } @Test @@ -920,7 +975,7 @@ public class CommitRequestManagerTest { new OffsetAndMetadata(0)); long commitCreationTimeMs = time.milliseconds(); - commitRequestManager.addOffsetCommitRequest(offsets, Optional.empty(), true); + commitRequestManager.commitAsync(offsets); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); @@ -1029,7 +1084,7 @@ public class CommitRequestManagerTest { partitions.add(tp2); long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = - commitRequestManger.addOffsetFetchRequest(partitions, expirationTimeMs); + commitRequestManger.fetchOffsets(partitions, expirationTimeMs); NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); @@ -1058,7 +1113,7 @@ public class CommitRequestManagerTest { Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0)); - commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false); + commitRequestManger.commitAsync(offsets); commitRequestManger.signalClose(); NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); @@ -1089,7 +1144,7 @@ public class CommitRequestManagerTest { List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = new ArrayList<>(); long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs; for (int i = 0; i < numRequest; i++) { - futures.add(commitRequestManger.addOffsetFetchRequest(partitions, expirationTimeMs)); + futures.add(commitRequestManger.fetchOffsets(partitions, expirationTimeMs)); } NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 4ff652ff493..a491df417de 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -21,11 +21,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; +import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; import org.apache.kafka.common.Node; @@ -135,7 +137,7 @@ public class ConsumerNetworkThreadTest { @Test public void testApplicationEvent() { - ApplicationEvent e = new CommitApplicationEvent(new HashMap<>(), Optional.empty()); + ApplicationEvent e = new PollApplicationEvent(100); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor, times(1)).process(e); @@ -150,11 +152,19 @@ public class ConsumerNetworkThreadTest { } @Test - public void testCommitEvent() { - ApplicationEvent e = new CommitApplicationEvent(new HashMap<>(), Optional.empty()); + public void testAsyncCommitEvent() { + ApplicationEvent e = new AsyncCommitApplicationEvent(new HashMap<>()); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(CommitApplicationEvent.class)); + verify(applicationEventProcessor).process(any(AsyncCommitApplicationEvent.class)); + } + + @Test + public void testSyncCommitEvent() { + ApplicationEvent e = new SyncCommitApplicationEvent(new HashMap<>(), 100L); + applicationEventsQueue.add(e); + consumerNetworkThread.runOnce(); + verify(applicationEventProcessor).process(any(SyncCommitApplicationEvent.class)); } @Test @@ -209,7 +219,7 @@ public class ConsumerNetworkThreadTest { verify(networkClient, times(1)).poll(anyLong(), anyLong()); verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); // Assignment change should generate an async commit (not retried). - verify(commitRequestManager, times(1)).maybeAutoCommitAllConsumedAsync(); + verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); } @Test @@ -273,8 +283,8 @@ public class ConsumerNetworkThreadTest { coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); - CompletableApplicationEvent<Void> event1 = spy(new CommitApplicationEvent(Collections.emptyMap(), Optional.empty())); - ApplicationEvent event2 = new CommitApplicationEvent(Collections.emptyMap(), Optional.empty()); + CompletableApplicationEvent<Void> event1 = spy(new AsyncCommitApplicationEvent(Collections.emptyMap())); + ApplicationEvent event2 = new AsyncCommitApplicationEvent(Collections.emptyMap()); CompletableFuture<Void> future = new CompletableFuture<>(); when(event1.future()).thenReturn(future); applicationEventsQueue.add(event1); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index 2d8ef9397f3..f5c65d58bfc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -69,6 +69,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doNothing; @@ -1922,6 +1923,7 @@ public class MembershipManagerImplTest { Map<Uuid, SortedSet<Integer>> assignmentByTopicId = assignmentByTopicId(expectedAssignment); assertEquals(assignmentByTopicId, membershipManager.currentAssignment()); + // The auto-commit interval should be reset (only once), when the reconciliation completes verify(commitRequestManager).resetAutoCommitTimer(); } @@ -1947,7 +1949,7 @@ public class MembershipManagerImplTest { if (withAutoCommit) { when(commitRequestManager.autoCommitEnabled()).thenReturn(true); CompletableFuture<Void> commitResult = new CompletableFuture<>(); - when(commitRequestManager.maybeAutoCommitAllConsumedNow(any(), anyBoolean())).thenReturn(commitResult); + when(commitRequestManager.maybeAutoCommitSyncNow(anyLong())).thenReturn(commitResult); return commitResult; } else { return CompletableFuture.completedFuture(null);