cadonna commented on code in PR #16031: URL: https://github.com/apache/kafka/pull/16031#discussion_r1625726216
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -319,11 +326,11 @@ private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState if (error == null) { result.complete(null); } else { - if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { - if (error instanceof TimeoutException && requestAttempt.isExpired()) { - log.debug("Auto-commit sync before revocation timed out and won't be retried anymore"); - result.completeExceptionally(error); - } else if (error instanceof UnknownTopicOrPartitionException) { + if (requestAttempt.isExpired()) { Review Comment: Follow-up to https://github.com/apache/kafka/pull/16031#discussion_r1624620726 I am not sure if this is good. Let's assume I get a non-retriable error AND the timeout was exceeded. Do I really want to get a timeout exception wrapped around the non-retriable error? I want a timeout exception when I can improve the situation by increasing the timeout. If I get a non-retriable error, I cannot improve the situation by increasing the timeout. So, I would only throw a timeout exception when I get a retriable error. WDYT? This affects also the other location where the expiration is verified. Either way, tests are required that test the combination of (non-)retriable error and expiration. ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala: ########## @@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { runMultiConsumerSessionTimeoutTest(true) } + // Ensure TestUtils polls with ZERO. This fails for the new consumer only. Review Comment: I do not understand this comment. Could you please elaborate? If I look where the poll is done, a duration of 100 ms is used. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -465,39 +474,39 @@ private Throwable commitAsyncExceptionForError(Throwable error) { * Enqueue a request to fetch committed offsets, that will be sent on the next call to {@link #poll(long)}. * * @param partitions Partitions to fetch offsets for. - * @param expirationTimeMs Time until which the request should be retried if it fails + * @param deadlineMs Time until which the request should be retried if it fails Review Comment: nit: the indentation is not consistent ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -391,7 +398,7 @@ public CompletableFuture<Void> commitAsync(final Map<TopicPartition, OffsetAndMe * Commit offsets, retrying on expected retriable errors while the retry timeout hasn't expired. * * @param offsets Offsets to commit - * @param deadlineMs Time until which the request will be retried if it fails with + * @param deadlineMs Time until which the request will be retried if it fails with Review Comment: nit: The indentation is not consistent. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -469,7 +474,7 @@ private Throwable commitAsyncExceptionForError(Throwable error) { * Enqueue a request to fetch committed offsets, that will be sent on the next call to {@link #poll(long)}. * * @param partitions Partitions to fetch offsets for. - * @param deadlineMs Time until which the request should be retried if it fails Review Comment: nit: revert ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { Review Comment: Could you add unit tests for this class? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + + private final Timer timer; + + public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { + super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); + this.timer = timer; + } + + TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final int retryBackoffExpBase, + final long retryBackoffMaxMs, + final double jitter, + final Timer timer) { + super(logContext, owner, retryBackoffMs, retryBackoffExpBase, retryBackoffMaxMs, jitter); + this.timer = timer; + } + + public boolean isExpired() { + timer.update(); + return timer.isExpired(); + } + + public long remainingMs() { + timer.update(); + return timer.remainingMs(); + } + + public static Timer deadlineTimer(final Time time, final long deadlineMs) { + // Prevent the timer from being negative. Review Comment: I do not think that you need to explain this. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +/** + * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} with which to keep track + * of the request's expiration. + */ +public class TimedRequestState extends RequestState { + + private final Timer timer; + + public TimedRequestState(final LogContext logContext, + final String owner, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final Timer timer) { + super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); + this.timer = timer; + } + + TimedRequestState(final LogContext logContext, Review Comment: Both of this constructors are only called in child constructors. Why is one package-private and the other public? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState { */ abstract CompletableFuture<?> future(); - /** - * Complete the request future with a TimeoutException if the request timeout has been - * reached, based on the provided current time. - */ - void maybeExpire(long currentTimeMs) { - if (retryTimeoutExpired(currentTimeMs)) { - removeRequest(); - isExpired = true; - future().completeExceptionally(new TimeoutException(requestDescription() + - " could not complete before timeout expired.")); - } - } - /** * Build request with the given builder, including response handling logic. */ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final AbstractRequest.Builder<?> builder) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( builder, - coordinatorRequestManager.coordinator()); + coordinatorRequestManager.coordinator(), + time.timer(requestTimeoutMs) + ); request.whenComplete( (response, throwable) -> { long currentTimeMs = request.handler().completionTimeMs(); Review Comment: I will let you decide what to do. It just caught me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org