kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1334885217
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -526,13 +677,57 @@ public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } + private Timer createTimerForRequest(final Duration timeout) { + // this.time could be null if an exception occurs in constructor prior to setting the this.time field + final Time localTime = (time == null) ? Time.SYSTEM : time; + return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); + } + @Override public void close(Duration timeout) { + if (timeout.toMillis() < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + + try { + if (!closed) { + // need to close before setting the flag since the close function + // itself may trigger rebalance callback that needs the consumer to be open still + close(timeout, false); + } + } finally { + closed = true; + } + } + + private void close(Duration timeout, boolean swallowException) { + log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); + + final Timer closeTimer = createTimerForRequest(timeout); + if (fetchBuffer != null) { + // the timeout for the session close is at-most the requestTimeoutMs + long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs()); + if (remainingDurationInTimeout > 0) { + remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout); + } + + closeTimer.reset(remainingDurationInTimeout); + + // This is a blocking call bound by the time remaining in closeTimer Review Comment: True. There doesn't appear to be any blocking calls made from the `FetchBuffer.close()` path. I'll dig around some more. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -526,13 +677,57 @@ public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } + private Timer createTimerForRequest(final Duration timeout) { + // this.time could be null if an exception occurs in constructor prior to setting the this.time field + final Time localTime = (time == null) ? Time.SYSTEM : time; + return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); + } + @Override public void close(Duration timeout) { + if (timeout.toMillis() < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + + try { + if (!closed) { + // need to close before setting the flag since the close function + // itself may trigger rebalance callback that needs the consumer to be open still + close(timeout, false); + } + } finally { + closed = true; + } + } + + private void close(Duration timeout, boolean swallowException) { + log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); + + final Timer closeTimer = createTimerForRequest(timeout); + if (fetchBuffer != null) { + // the timeout for the session close is at-most the requestTimeoutMs + long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs()); + if (remainingDurationInTimeout > 0) { + remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout); Review Comment: True. We don't have to account for time it takes for the coordinator to close because we don't have a coordinator 😄. I'll remove that. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -526,13 +677,57 @@ public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } + private Timer createTimerForRequest(final Duration timeout) { + // this.time could be null if an exception occurs in constructor prior to setting the this.time field + final Time localTime = (time == null) ? Time.SYSTEM : time; + return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); + } + @Override public void close(Duration timeout) { + if (timeout.toMillis() < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + + try { + if (!closed) { + // need to close before setting the flag since the close function + // itself may trigger rebalance callback that needs the consumer to be open still + close(timeout, false); + } + } finally { + closed = true; + } + } + + private void close(Duration timeout, boolean swallowException) { + log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); + + final Timer closeTimer = createTimerForRequest(timeout); + if (fetchBuffer != null) { + // the timeout for the session close is at-most the requestTimeoutMs + long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs()); + if (remainingDurationInTimeout > 0) { + remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout); Review Comment: True. We don't have to account for time it takes for the coordinator to close because we don't have a coordinator 😄. I'll remove that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org