ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553005186



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -97,15 +99,132 @@ public void shouldAddStreamThread() throws Exception {
 
             final Optional<String> name = kafkaStreams.addStreamThread();
 
-            assertThat(name, CoreMatchers.not(Optional.empty()));
+            assertThat(name, not(Optional.empty()));
             TestUtils.waitForCondition(
                 () -> kafkaStreams.localThreadsMetadata().stream().sequential()
                         .map(ThreadMetadata::threadName).anyMatch(t -> 
t.equals(name.orElse(""))),
                 "Wait for the thread to be added"
             );
             assertThat(kafkaStreams.localThreadsMetadata().size(), 
equalTo(oldThreadCount + 1));
-            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> 
t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new 
String[] {"1", "2", "3"}));
-            TestUtils.waitForCondition(() -> kafkaStreams.state() == 
KafkaStreams.State.RUNNING, "wait for running");
+            assertThat(
+                kafkaStreams
+                    .localThreadsMetadata()
+                    .stream()
+                    .map(t -> t.threadName().split("-StreamThread-")[1])
+                    .sorted().toArray(),
+                equalTo(new String[] {"1", "2", "3"})
+            );
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.state() == KafkaStreams.State.RUNNING,
+                "Kafka Streams client did not reach state RUNNING"
+            );
+        }
+    }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+            
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], 
equalTo(appId));
+            assertThat(kafkaStreams.localThreadsMetadata().size(), 
equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveThreads() throws InterruptedException {
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+            
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+            final CountDownLatch latch = new CountDownLatch(2);
+            final Thread one = adjustCountHelperThread(kafkaStreams, 4, latch);
+            final Thread two = adjustCountHelperThread(kafkaStreams, 6, latch);
+            two.start();
+            one.start();
+            latch.await(30, TimeUnit.SECONDS);
+            assertThat(kafkaStreams.localThreadsMetadata().size(), 
equalTo(oldThreadCount));
+        }

Review comment:
       Awesome, thanks for adding this test. One small suggestion would be to 
wait for the client to get back to RUNNING at the end, so we can verify that 
everything did go smoothly with the add/remove. I think this would be good to 
do in all of these tests, actually




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to