This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b1b330e664c MINOR: Followup KAFKA-18193 for null check and error
message (#20650)
b1b330e664c is described below
commit b1b330e664ce6380adec873c410a36ef4664206b
Author: Ken Huang <[email protected]>
AuthorDate: Mon Oct 13 00:07:45 2025 +0800
MINOR: Followup KAFKA-18193 for null check and error message (#20650)
This PR is a follow-up to KAFKA-18193. It addresses the need for a null
check and an improved error message. Please refer to the previous
comments and the review of
https://github.com/apache/kafka/pull/19955#pullrequestreview-3310028108
for more context.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 8 +++++---
.../org/apache/kafka/streams/KafkaStreamsTest.java | 23 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 999befa3495..68abb913612 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1523,7 +1523,8 @@ public class KafkaStreams implements AutoCloseable {
}, clientId + "-CloseThread");
}
- private boolean close(final Optional<Long> timeout, final
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
+ // visible for testing
+ boolean close(final Optional<Long> timeout, final
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
final long timeoutMs;
if (timeout.isPresent()) {
timeoutMs = timeout.get();
@@ -1635,8 +1636,9 @@ public class KafkaStreams implements AutoCloseable {
public synchronized boolean close(final
org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
Objects.requireNonNull(options, "options cannot be null");
final CloseOptionsInternal optionsInternal = new
CloseOptionsInternal(options);
- final String msgPrefix =
prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
- final long timeoutMs =
validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
+ final Duration timeout = optionsInternal.timeout().orElseGet(() ->
Duration.ofMillis(Long.MAX_VALUE));
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout,
"timeout");
+ final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 1ceebab1cd7..83eeb0befc6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -120,6 +120,7 @@ import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
@@ -1297,6 +1298,28 @@ public class KafkaStreamsTest {
}
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseDefaultTimeoutForCloseWithNullTimeout() throws
Exception {
+ prepareStreams();
+ prepareStreamThread(streamThreadOne, 1);
+ prepareStreamThread(streamThreadTwo, 2);
+ prepareTerminableThread(streamThreadOne);
+
+ final MockClientSupplier mockClientSupplier =
spy(MockClientSupplier.class);
+ when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
+
+ final CloseOptions closeOptions = CloseOptions.timeout(null)
+
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+ final KafkaStreams streams = spy(new KafkaStreamsWithTerminableThread(
+ getBuilderWithSource().build(), props, mockClientSupplier,
time));
+
+ doReturn(false).when(streams).close(any(Optional.class), any());
+ streams.close(closeOptions);
+
+ verify(streams).close(eq(Optional.of(Long.MAX_VALUE)),
eq(CloseOptions.GroupMembershipOperation.LEAVE_GROUP));
+ }
+
@Test
public void
shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() throws
Exception {
prepareStreams();