kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1337444456


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -526,13 +677,57 @@ public void close() {
         close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
     }
 
+    private Timer createTimerForRequest(final Duration timeout) {
+        // this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+        final Time localTime = (time == null) ? Time.SYSTEM : time;
+        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
     @Override
     public void close(Duration timeout) {
+        if (timeout.toMillis() < 0)
+            throw new IllegalArgumentException("The timeout cannot be 
negative.");
+
+        try {
+            if (!closed) {
+                // need to close before setting the flag since the close 
function
+                // itself may trigger rebalance callback that needs the 
consumer to be open still
+                close(timeout, false);
+            }
+        } finally {
+            closed = true;
+        }
+    }
+
+    private void close(Duration timeout, boolean swallowException) {
+        log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
+
+        final Timer closeTimer = createTimerForRequest(timeout);
+        if (fetchBuffer != null) {
+            // the timeout for the session close is at-most the 
requestTimeoutMs
+            long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - 
closeTimer.elapsedMs());
+            if (remainingDurationInTimeout > 0) {
+                remainingDurationInTimeout = Math.min(requestTimeoutMs, 
remainingDurationInTimeout);

Review Comment:
   Removed the unnecessary timer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to