This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d334f60944f MINOR: KStreamRepartitionIntegrationTest bug (#17963)
d334f60944f is described below
commit d334f60944fa51622f0035039fa36d6d98405c59
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Nov 27 16:08:05 2024 -0500
MINOR: KStreamRepartitionIntegrationTest bug (#17963)
The
KStreamRepartitionIntegrationTest.shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDoNotMatchSourceTopicWhenJoining
test was taking two minutes due not reaching an expected condition. By
updating to have the StreamsUncaughtExceptionHandler return a response of
SHUTDOWN_CLIENT the expected ERROR state is now reached. The root cause was
using the Thread.UncaughtExceptionHandler to handle the exception.
Without this fix, the test takes 2 minutes to run, and now it's 1 second.
Reviewers: Matthias Sax <[email protected]>
---
.../KStreamRepartitionIntegrationTest.java | 58 +++++++---------------
1 file changed, 19 insertions(+), 39 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index d60a13915b5..d9c7c91bb5c 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -29,7 +29,6 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -76,6 +75,7 @@ import java.util.stream.IntStream;
import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
import static org.apache.kafka.streams.KafkaStreams.State.REBALANCING;
import static org.apache.kafka.streams.KafkaStreams.State.RUNNING;
+import static
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -168,17 +168,21 @@ public class KStreamRepartitionIntegrationTest {
.to(outputTopic);
final Properties streamsConfiguration =
createStreamsConfig(topologyOptimization);
- builder.build(streamsConfiguration);
-
- startStreams(builder, REBALANCING, ERROR, streamsConfiguration, (t, e)
-> expectedThrowable.set(e));
-
- final String expectedMsg = String.format("Number of partitions [%s] of
repartition topic [%s] " +
- "doesn't match number of
partitions [%s] of the source topic.",
-
inputTopicRepartitionedNumOfPartitions,
-
toRepartitionTopicName(inputTopicRepartitionName),
- topicBNumberOfPartitions);
- assertNotNull(expectedThrowable.get());
- assertTrue(expectedThrowable.get().getMessage().contains(expectedMsg));
+ try (final KafkaStreams ks = new
KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration)) {
+ ks.setUncaughtExceptionHandler(exception -> {
+ expectedThrowable.set(exception);
+ return SHUTDOWN_CLIENT;
+ });
+ ks.start();
+ TestUtils.waitForCondition(() -> ks.state() == ERROR, 30_000,
"Kafka Streams never went into error state");
+ final String expectedMsg = String.format("Number of partitions
[%s] of repartition topic [%s] " +
+ "doesn't match number of partitions [%s] of the
source topic.",
+ inputTopicRepartitionedNumOfPartitions,
+ toRepartitionTopicName(inputTopicRepartitionName),
+ topicBNumberOfPartitions);
+ assertNotNull(expectedThrowable.get());
+
assertTrue(expectedThrowable.get().getMessage().contains(expectedMsg));
+ }
}
@ParameterizedTest
@@ -723,7 +727,7 @@ public class KStreamRepartitionIntegrationTest {
)
);
- kafkaStreamsToClose.close();
+ kafkaStreamsToClose.close(Duration.ofSeconds(5));
sendEvents(
timestamp,
@@ -814,36 +818,12 @@ public class KStreamRepartitionIntegrationTest {
}
private KafkaStreams startStreams(final StreamsBuilder builder, final
Properties streamsConfiguration) throws InterruptedException {
- return startStreams(builder, REBALANCING, RUNNING,
streamsConfiguration, null);
- }
-
- private KafkaStreams startStreams(final StreamsBuilder builder,
- final State expectedOldState,
- final State expectedNewState,
- final Properties streamsConfiguration,
- final Thread.UncaughtExceptionHandler
uncaughtExceptionHandler) throws InterruptedException {
final CountDownLatch latch;
final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
-
- if (uncaughtExceptionHandler == null) {
- latch = new CountDownLatch(1);
- } else {
- latch = new CountDownLatch(2);
- kafkaStreams.setUncaughtExceptionHandler(e -> {
-
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
- latch.countDown();
- if (e instanceof RuntimeException) {
- throw (RuntimeException) e;
- } else if (e instanceof Error) {
- throw (Error) e;
- } else {
- throw new RuntimeException("Unexpected checked exception
caught in the uncaught exception handler", e);
- }
- });
- }
+ latch = new CountDownLatch(1);
kafkaStreams.setStateListener((newState, oldState) -> {
- if (expectedOldState == oldState && expectedNewState == newState) {
+ if (REBALANCING == oldState && RUNNING == newState) {
latch.countDown();
}
});