This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d087ade527e HOTFIX: Fix StreamThreadTest (#19562)
d087ade527e is described below
commit d087ade527e0679b74fa87d6b22344dc470318aa
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Apr 25 15:03:39 2025 +0200
HOTFIX: Fix StreamThreadTest (#19562)
Commit 732ed06 changed the logic of handling shutdowns, but in parallel
commit 3fae785 had introduced a new unit test for checking how to shut
down, which was broken by the later commit.
Reviewers: David Jacot <[email protected]>
---
.../processor/internals/StreamThreadTest.java | 38 ++++++++++++++--------
1 file changed, 24 insertions(+), 14 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 35029519dfb..6fc8b4efb30 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -52,6 +53,7 @@ import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
@@ -82,7 +84,6 @@ import
org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager;
@@ -178,6 +179,7 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -640,7 +642,7 @@ public class StreamThreadTest {
thread.setState(State.PARTITIONS_REVOKED);
thread.runOnceWithoutProcessingThreads();
- Mockito.verify(taskManager, Mockito.never()).process(Mockito.anyInt(),
Mockito.any());
+ Mockito.verify(taskManager, never()).process(Mockito.anyInt(),
Mockito.any());
}
@ParameterizedTest
@@ -3800,7 +3802,7 @@ public class StreamThreadTest {
Map.of(),
Map.of()
);
- final AtomicInteger assignmentErrorCode = new AtomicInteger(0);
+ final Runnable shutdownErrorHook = mock(Runnable.class);
final Properties props = configProps(false, false, false);
final StreamsConfig config = new StreamsConfig(props);
@@ -3819,10 +3821,10 @@ public class StreamThreadTest {
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
- assignmentErrorCode,
+ null,
new AtomicLong(Long.MAX_VALUE),
new LinkedList<>(),
- null,
+ shutdownErrorHook,
HANDLER,
null,
Optional.of(streamsRebalanceData),
@@ -3831,11 +3833,15 @@ public class StreamThreadTest {
thread.setState(State.STARTING);
thread.runOnceWithoutProcessingThreads();
- assertEquals(0, assignmentErrorCode.get());
+ verify(shutdownErrorHook, never()).run();
- streamsRebalanceData.requestShutdown();
+ streamsRebalanceData.setStatuses(List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
+ .setStatusDetail("Shutdown requested")
+ ));
thread.runOnceWithoutProcessingThreads();
- assertEquals(AssignorError.SHUTDOWN_REQUESTED.code(),
assignmentErrorCode.get());
+ verify(shutdownErrorHook).run();
}
@Test
@@ -3850,9 +3856,9 @@ public class StreamThreadTest {
Map.of(),
Map.of()
);
- final AtomicInteger assignmentErrorCode = new AtomicInteger(0);
final Properties props = configProps(false, false, false);
+ final Runnable shutdownErrorHook = mock(Runnable.class);
final StreamsConfig config = new StreamsConfig(props);
thread = new StreamThread(
new MockTime(1),
@@ -3869,10 +3875,10 @@ public class StreamThreadTest {
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
- assignmentErrorCode,
+ null,
new AtomicLong(Long.MAX_VALUE),
new LinkedList<>(),
- null,
+ shutdownErrorHook,
HANDLER,
null,
Optional.of(streamsRebalanceData),
@@ -3881,11 +3887,15 @@ public class StreamThreadTest {
thread.setState(State.STARTING);
thread.runOnceWithProcessingThreads();
- assertEquals(0, assignmentErrorCode.get());
+ verify(shutdownErrorHook, never()).run();
- streamsRebalanceData.requestShutdown();
+ streamsRebalanceData.setStatuses(List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
+ .setStatusDetail("Shutdown requested")
+ ));
thread.runOnceWithProcessingThreads();
- assertEquals(AssignorError.SHUTDOWN_REQUESTED.code(),
assignmentErrorCode.get());
+ verify(shutdownErrorHook).run();
}
@Test