cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624238381


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState 
{
          */
         abstract CompletableFuture<?> future();
 
-        /**
-         * Complete the request future with a TimeoutException if the request 
timeout has been
-         * reached, based on the provided current time.
-         */
-        void maybeExpire(long currentTimeMs) {
-            if (retryTimeoutExpired(currentTimeMs)) {
-                removeRequest();
-                isExpired = true;
-                future().completeExceptionally(new 
TimeoutException(requestDescription() +
-                    " could not complete before timeout expired."));
-            }
-        }
-
         /**
          * Build request with the given builder, including response handling 
logic.
          */
         NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder<?> builder) {
             NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
                 builder,
-                coordinatorRequestManager.coordinator());
+                coordinatorRequestManager.coordinator(),
+                time.timer(requestTimeoutMs)
+            );
             request.whenComplete(
                 (response, throwable) -> {
                     long currentTimeMs = request.handler().completionTimeMs();

Review Comment:
   Not directly related to this PR: Why is this variable called `currentTimeMs`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+    private final Timer timer;
+
+    public TimedRequestState(final LogContext logContext,
+                             final String owner,
+                             final long retryBackoffMs,
+                             final long retryBackoffMaxMs,
+                             final Timer timer) {
+        super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+        this.timer = timer;
+    }
+
+    // Visible for testing

Review Comment:
   This does not seem to be true. The constructor is not called in tests and 
also the constructors that call this constructor are not called in tests. That 
is one reason I do not like this kind of comments, because they easily start to 
lie. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -235,7 +235,6 @@ public void addAll(final List<UnsentRequest> requests) {
 
     public void add(final UnsentRequest r) {
         Objects.requireNonNull(r);
-        r.setTimer(this.time, this.requestTimeoutMs);

Review Comment:
   I guess it does not make a big difference if the timer is set when the 
unsent request is created vs. when the unsent request is added to the network 
client, right? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState 
requestAttempt,
                 result.complete(null);
             } else {
                 if (error instanceof RetriableException) {
-                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                    if (error instanceof TimeoutException && 
requestAttempt.isExpired()) {

Review Comment:
   Why do we only check expiration in case of a `TimeoutException` but not for 
other retriable exceptions?
   The `TopicMetadataRequestManager` handles this differently.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -960,7 +960,7 @@ void maybeReconcile() {
         // best effort to commit the offsets in the case where the epoch might 
have changed while
         // the current reconciliation is in process. Note this is using the 
rebalance timeout as
         // it is the limit enforced by the broker to complete the 
reconciliation process.
-        commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs));
+        commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));

Review Comment:
   Not directly related to this PR: is it OK to use the rebalance timeout here 
instead of using a timer that computes the remaining time until the rebalance 
timeout is expired?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to