This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d334f60944f MINOR: KStreamRepartitionIntegrationTest  bug  (#17963)
d334f60944f is described below

commit d334f60944fa51622f0035039fa36d6d98405c59
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Nov 27 16:08:05 2024 -0500

    MINOR: KStreamRepartitionIntegrationTest  bug  (#17963)
    
    The 
KStreamRepartitionIntegrationTest.shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDoNotMatchSourceTopicWhenJoining
 test was taking two minutes due not reaching an expected condition. By 
updating to have the StreamsUncaughtExceptionHandler return a response of 
SHUTDOWN_CLIENT the expected ERROR state is now reached. The root cause was 
using the Thread.UncaughtExceptionHandler to handle the exception.
    
    Without this fix, the test takes 2 minutes to run, and now it's 1 second.
    
    Reviewers: Matthias Sax <[email protected]>
---
 .../KStreamRepartitionIntegrationTest.java         | 58 +++++++---------------
 1 file changed, 19 insertions(+), 39 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index d60a13915b5..d9c7c91bb5c 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -29,7 +29,6 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -76,6 +75,7 @@ import java.util.stream.IntStream;
 import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
 import static org.apache.kafka.streams.KafkaStreams.State.REBALANCING;
 import static org.apache.kafka.streams.KafkaStreams.State.RUNNING;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -168,17 +168,21 @@ public class KStreamRepartitionIntegrationTest {
                .to(outputTopic);
 
         final Properties streamsConfiguration = 
createStreamsConfig(topologyOptimization);
-        builder.build(streamsConfiguration);
-
-        startStreams(builder, REBALANCING, ERROR, streamsConfiguration, (t, e) 
-> expectedThrowable.set(e));
-
-        final String expectedMsg = String.format("Number of partitions [%s] of 
repartition topic [%s] " +
-                                                 "doesn't match number of 
partitions [%s] of the source topic.",
-                                                 
inputTopicRepartitionedNumOfPartitions,
-                                                 
toRepartitionTopicName(inputTopicRepartitionName),
-                                                 topicBNumberOfPartitions);
-        assertNotNull(expectedThrowable.get());
-        assertTrue(expectedThrowable.get().getMessage().contains(expectedMsg));
+        try (final KafkaStreams ks = new 
KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration)) {
+            ks.setUncaughtExceptionHandler(exception -> {
+                expectedThrowable.set(exception);
+                return SHUTDOWN_CLIENT;
+            });
+            ks.start();
+            TestUtils.waitForCondition(() -> ks.state() == ERROR, 30_000, 
"Kafka Streams never went into error state");
+            final String expectedMsg = String.format("Number of partitions 
[%s] of repartition topic [%s] " +
+                            "doesn't match number of partitions [%s] of the 
source topic.",
+                    inputTopicRepartitionedNumOfPartitions,
+                    toRepartitionTopicName(inputTopicRepartitionName),
+                    topicBNumberOfPartitions);
+            assertNotNull(expectedThrowable.get());
+            
assertTrue(expectedThrowable.get().getMessage().contains(expectedMsg));
+        }
     }
 
     @ParameterizedTest
@@ -723,7 +727,7 @@ public class KStreamRepartitionIntegrationTest {
             )
         );
 
-        kafkaStreamsToClose.close();
+        kafkaStreamsToClose.close(Duration.ofSeconds(5));
 
         sendEvents(
             timestamp,
@@ -814,36 +818,12 @@ public class KStreamRepartitionIntegrationTest {
     }
 
     private KafkaStreams startStreams(final StreamsBuilder builder, final 
Properties streamsConfiguration) throws InterruptedException {
-        return startStreams(builder, REBALANCING, RUNNING, 
streamsConfiguration, null);
-    }
-
-    private KafkaStreams startStreams(final StreamsBuilder builder,
-                                      final State expectedOldState,
-                                      final State expectedNewState,
-                                      final Properties streamsConfiguration,
-                                      final Thread.UncaughtExceptionHandler 
uncaughtExceptionHandler) throws InterruptedException {
         final CountDownLatch latch;
         final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
-
-        if (uncaughtExceptionHandler == null) {
-            latch = new CountDownLatch(1);
-        } else {
-            latch = new CountDownLatch(2);
-            kafkaStreams.setUncaughtExceptionHandler(e -> {
-                
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
-                latch.countDown();
-                if (e instanceof RuntimeException) {
-                    throw (RuntimeException) e;
-                } else if (e instanceof Error) {
-                    throw (Error) e;
-                } else {
-                    throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", e);
-                }
-            });
-        }
+        latch = new CountDownLatch(1);
 
         kafkaStreams.setStateListener((newState, oldState) -> {
-            if (expectedOldState == oldState && expectedNewState == newState) {
+            if (REBALANCING == oldState && RUNNING == newState) {
                 latch.countDown();
             }
         });

Reply via email to