cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1625726216


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -319,11 +326,11 @@ private void 
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
             if (error == null) {
                 result.complete(null);
             } else {
-                if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
-                    if (error instanceof TimeoutException && 
requestAttempt.isExpired()) {
-                        log.debug("Auto-commit sync before revocation timed 
out and won't be retried anymore");
-                        result.completeExceptionally(error);
-                    } else if (error instanceof 
UnknownTopicOrPartitionException) {
+                if (requestAttempt.isExpired()) {

Review Comment:
   Follow-up to 
https://github.com/apache/kafka/pull/16031#discussion_r1624620726
   
   I am not sure if this is good. Let's assume I get a non-retriable error AND 
the timeout was exceeded. Do I really want to get a timeout exception wrapped 
around the non-retriable error? I want a timeout exception when I can improve 
the situation by increasing the timeout. If I get a non-retriable error, I 
cannot improve the situation by increasing the timeout.
   So, I would only throw a timeout exception when I get a retriable error.
   WDYT?
   
   This affects also the other location where the expiration is verified.
   
   Either way, tests are required that test the combination of (non-)retriable 
error and expiration.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala:
##########
@@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
     runMultiConsumerSessionTimeoutTest(true)
   }
 
+  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.

Review Comment:
   I do not understand this comment. Could you please elaborate?
   If I look where the poll is done, a duration of 100 ms is used.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -465,39 +474,39 @@ private Throwable commitAsyncExceptionForError(Throwable 
error) {
      * Enqueue a request to fetch committed offsets, that will be sent on the 
next call to {@link #poll(long)}.
      *
      * @param partitions       Partitions to fetch offsets for.
-     * @param expirationTimeMs Time until which the request should be retried 
if it fails
+     * @param deadlineMs Time until which the request should be retried if it 
fails

Review Comment:
   nit: the indentation is not consistent



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -391,7 +398,7 @@ public CompletableFuture<Void> commitAsync(final 
Map<TopicPartition, OffsetAndMe
      * Commit offsets, retrying on expected retriable errors while the retry 
timeout hasn't expired.
      *
      * @param offsets               Offsets to commit
-     * @param deadlineMs            Time until which the request will be 
retried if it fails with
+     * @param deadlineMs      Time until which the request will be retried if 
it fails with

Review Comment:
   nit: The indentation is not consistent.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -469,7 +474,7 @@ private Throwable commitAsyncExceptionForError(Throwable 
error) {
      * Enqueue a request to fetch committed offsets, that will be sent on the 
next call to {@link #poll(long)}.
      *
      * @param partitions       Partitions to fetch offsets for.
-     * @param deadlineMs       Time until which the request should be retried 
if it fails

Review Comment:
   nit: revert



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {

Review Comment:
   Could you add unit tests for this class?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+    private final Timer timer;
+
+    public TimedRequestState(final LogContext logContext,
+                             final String owner,
+                             final long retryBackoffMs,
+                             final long retryBackoffMaxMs,
+                             final Timer timer) {
+        super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+        this.timer = timer;
+    }
+
+    TimedRequestState(final LogContext logContext,
+                      final String owner,
+                      final long retryBackoffMs,
+                      final int retryBackoffExpBase,
+                      final long retryBackoffMaxMs,
+                      final double jitter,
+                      final Timer timer) {
+        super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter);
+        this.timer = timer;
+    }
+
+    public boolean isExpired() {
+        timer.update();
+        return timer.isExpired();
+    }
+
+    public long remainingMs() {
+        timer.update();
+        return timer.remainingMs();
+    }
+
+    public static Timer deadlineTimer(final Time time, final long deadlineMs) {
+        // Prevent the timer from being negative.

Review Comment:
   I do not think that you need to explain this.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+    private final Timer timer;
+
+    public TimedRequestState(final LogContext logContext,
+                             final String owner,
+                             final long retryBackoffMs,
+                             final long retryBackoffMaxMs,
+                             final Timer timer) {
+        super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+        this.timer = timer;
+    }
+
+    TimedRequestState(final LogContext logContext,

Review Comment:
   Both of this constructors are only called in child constructors. Why is one 
package-private and the other public?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState 
{
          */
         abstract CompletableFuture<?> future();
 
-        /**
-         * Complete the request future with a TimeoutException if the request 
timeout has been
-         * reached, based on the provided current time.
-         */
-        void maybeExpire(long currentTimeMs) {
-            if (retryTimeoutExpired(currentTimeMs)) {
-                removeRequest();
-                isExpired = true;
-                future().completeExceptionally(new 
TimeoutException(requestDescription() +
-                    " could not complete before timeout expired."));
-            }
-        }
-
         /**
          * Build request with the given builder, including response handling 
logic.
          */
         NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder<?> builder) {
             NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
                 builder,
-                coordinatorRequestManager.coordinator());
+                coordinatorRequestManager.coordinator(),
+                time.timer(requestTimeoutMs)
+            );
             request.whenComplete(
                 (response, throwable) -> {
                     long currentTimeMs = request.handler().completionTimeMs();

Review Comment:
   I will let you decide what to do. It just caught me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to