kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1334885217


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -526,13 +677,57 @@ public void close() {
         close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
     }
 
+    private Timer createTimerForRequest(final Duration timeout) {
+        // this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+        final Time localTime = (time == null) ? Time.SYSTEM : time;
+        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
     @Override
     public void close(Duration timeout) {
+        if (timeout.toMillis() < 0)
+            throw new IllegalArgumentException("The timeout cannot be 
negative.");
+
+        try {
+            if (!closed) {
+                // need to close before setting the flag since the close 
function
+                // itself may trigger rebalance callback that needs the 
consumer to be open still
+                close(timeout, false);
+            }
+        } finally {
+            closed = true;
+        }
+    }
+
+    private void close(Duration timeout, boolean swallowException) {
+        log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
+
+        final Timer closeTimer = createTimerForRequest(timeout);
+        if (fetchBuffer != null) {
+            // the timeout for the session close is at-most the 
requestTimeoutMs
+            long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - 
closeTimer.elapsedMs());
+            if (remainingDurationInTimeout > 0) {
+                remainingDurationInTimeout = Math.min(requestTimeoutMs, 
remainingDurationInTimeout);
+            }
+
+            closeTimer.reset(remainingDurationInTimeout);
+
+            // This is a blocking call bound by the time remaining in 
closeTimer

Review Comment:
   True. There doesn't appear to be any blocking calls made from the 
`FetchBuffer.close()` path. I'll dig around some more.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -526,13 +677,57 @@ public void close() {
         close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
     }
 
+    private Timer createTimerForRequest(final Duration timeout) {
+        // this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+        final Time localTime = (time == null) ? Time.SYSTEM : time;
+        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
     @Override
     public void close(Duration timeout) {
+        if (timeout.toMillis() < 0)
+            throw new IllegalArgumentException("The timeout cannot be 
negative.");
+
+        try {
+            if (!closed) {
+                // need to close before setting the flag since the close 
function
+                // itself may trigger rebalance callback that needs the 
consumer to be open still
+                close(timeout, false);
+            }
+        } finally {
+            closed = true;
+        }
+    }
+
+    private void close(Duration timeout, boolean swallowException) {
+        log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
+
+        final Timer closeTimer = createTimerForRequest(timeout);
+        if (fetchBuffer != null) {
+            // the timeout for the session close is at-most the 
requestTimeoutMs
+            long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - 
closeTimer.elapsedMs());
+            if (remainingDurationInTimeout > 0) {
+                remainingDurationInTimeout = Math.min(requestTimeoutMs, 
remainingDurationInTimeout);

Review Comment:
   True. We don't have to account for time it takes for the coordinator to 
close because we don't have a coordinator 😄.
   
   I'll remove that.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -526,13 +677,57 @@ public void close() {
         close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
     }
 
+    private Timer createTimerForRequest(final Duration timeout) {
+        // this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+        final Time localTime = (time == null) ? Time.SYSTEM : time;
+        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
     @Override
     public void close(Duration timeout) {
+        if (timeout.toMillis() < 0)
+            throw new IllegalArgumentException("The timeout cannot be 
negative.");
+
+        try {
+            if (!closed) {
+                // need to close before setting the flag since the close 
function
+                // itself may trigger rebalance callback that needs the 
consumer to be open still
+                close(timeout, false);
+            }
+        } finally {
+            closed = true;
+        }
+    }
+
+    private void close(Duration timeout, boolean swallowException) {
+        log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
+
+        final Timer closeTimer = createTimerForRequest(timeout);
+        if (fetchBuffer != null) {
+            // the timeout for the session close is at-most the 
requestTimeoutMs
+            long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - 
closeTimer.elapsedMs());
+            if (remainingDurationInTimeout > 0) {
+                remainingDurationInTimeout = Math.min(requestTimeoutMs, 
remainingDurationInTimeout);

Review Comment:
   True. We don't have to account for time it takes for the coordinator to 
close because we don't have a coordinator 😄.
   
   I'll remove that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to