This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b1b330e664c MINOR: Followup KAFKA-18193 for null check and error 
message (#20650)
b1b330e664c is described below

commit b1b330e664ce6380adec873c410a36ef4664206b
Author: Ken Huang <[email protected]>
AuthorDate: Mon Oct 13 00:07:45 2025 +0800

    MINOR: Followup KAFKA-18193 for null check and error message (#20650)
    
    This PR is a follow-up to KAFKA-18193. It addresses the need for a null
    check and an improved error message. Please refer to the previous
    comments and the review of
    https://github.com/apache/kafka/pull/19955#pullrequestreview-3310028108
    for more context.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  8 +++++---
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 23 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 999befa3495..68abb913612 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1523,7 +1523,8 @@ public class KafkaStreams implements AutoCloseable {
         }, clientId + "-CloseThread");
     }
 
-    private boolean close(final Optional<Long> timeout, final 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
+    // visible for testing
+    boolean close(final Optional<Long> timeout, final 
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
         final long timeoutMs;
         if (timeout.isPresent()) {
             timeoutMs = timeout.get();
@@ -1635,8 +1636,9 @@ public class KafkaStreams implements AutoCloseable {
     public synchronized boolean close(final 
org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
         Objects.requireNonNull(options, "options cannot be null");
         final CloseOptionsInternal optionsInternal = new 
CloseOptionsInternal(options);
-        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
-        final long timeoutMs = 
validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
+        final Duration timeout = optionsInternal.timeout().orElseGet(() -> 
Duration.ofMillis(Long.MAX_VALUE));
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+        final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
         if (timeoutMs < 0) {
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 1ceebab1cd7..83eeb0befc6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -120,6 +120,7 @@ import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.isA;
 import static org.mockito.Mockito.mock;
@@ -1297,6 +1298,28 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseDefaultTimeoutForCloseWithNullTimeout() throws 
Exception {
+        prepareStreams();
+        prepareStreamThread(streamThreadOne, 1);
+        prepareStreamThread(streamThreadTwo, 2);
+        prepareTerminableThread(streamThreadOne);
+
+        final MockClientSupplier mockClientSupplier = 
spy(MockClientSupplier.class);
+        when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
+
+        final CloseOptions closeOptions = CloseOptions.timeout(null)
+                
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+        final KafkaStreams streams = spy(new KafkaStreamsWithTerminableThread(
+                getBuilderWithSource().build(), props, mockClientSupplier, 
time));
+
+        doReturn(false).when(streams).close(any(Optional.class), any());
+        streams.close(closeOptions);
+
+        verify(streams).close(eq(Optional.of(Long.MAX_VALUE)), 
eq(CloseOptions.GroupMembershipOperation.LEAVE_GROUP));
+    }
+
     @Test
     public void 
shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() throws 
Exception {
         prepareStreams();

Reply via email to