kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1337444456
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -526,13 +677,57 @@ public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } + private Timer createTimerForRequest(final Duration timeout) { + // this.time could be null if an exception occurs in constructor prior to setting the this.time field + final Time localTime = (time == null) ? Time.SYSTEM : time; + return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); + } + @Override public void close(Duration timeout) { + if (timeout.toMillis() < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + + try { + if (!closed) { + // need to close before setting the flag since the close function + // itself may trigger rebalance callback that needs the consumer to be open still + close(timeout, false); + } + } finally { + closed = true; + } + } + + private void close(Duration timeout, boolean swallowException) { + log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); + + final Timer closeTimer = createTimerForRequest(timeout); + if (fetchBuffer != null) { + // the timeout for the session close is at-most the requestTimeoutMs + long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs()); + if (remainingDurationInTimeout > 0) { + remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout); Review Comment: Removed the unnecessary timer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org