cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1624238381
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState { */ abstract CompletableFuture<?> future(); - /** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. - */ - void maybeExpire(long currentTimeMs) { - if (retryTimeoutExpired(currentTimeMs)) { - removeRequest(); - isExpired = true; - future().completeExceptionally(new TimeoutException(requestDescription() + - " could not complete before timeout expired.")); - } - } - /** * Build request with the given builder, including response handling logic. */ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder<?> builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, - coordinatorRequestManager.coordinator()); + coordinatorRequestManager.coordinator(), + time.timer(requestTimeoutMs) + ); request.whenComplete( (response, throwable) -> { long currentTimeMs = request.handler().completionTimeMs(); Review Comment: Not directly related to this PR: Why is this variable called `currentTimeMs`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + + private final Timer timer; + + public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { + super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); + this.timer = timer; + } + + // Visible for testing Review Comment: This does not seem to be true. The constructor is not called in tests and also the constructors that call this constructor are not called in tests. That is one reason I do not like this kind of comments, because they easily start to lie. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -235,7 +235,6 @@ public void addAll(final List<UnsentRequest> requests) { public void add(final UnsentRequest r) { Objects.requireNonNull(r); - r.setTimer(this.time, this.requestTimeoutMs); Review Comment: I guess it does not make a big difference if the timer is set when the unsent request is created vs. when the unsent request is added to the network client, right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, result.complete(null); } else { if (error instanceof RetriableException) { - if (error instanceof TimeoutException && requestAttempt.isExpired) { + if (error instanceof TimeoutException && requestAttempt.isExpired()) { Review Comment: Why do we only check expiration in case of a `TimeoutException` but not for other retriable exceptions? The `TopicMetadataRequestManager` handles this differently. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -960,7 +960,7 @@ void maybeReconcile() { // best effort to commit the offsets in the case where the epoch might have changed while // the current reconciliation is in process. Note this is using the rebalance timeout as // it is the limit enforced by the broker to complete the reconciliation process. - commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs)); + commitResult = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs)); Review Comment: Not directly related to this PR: is it OK to use the rebalance timeout here instead of using a timer that computes the remaining time until the rebalance timeout is expired? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org