ableegoldman commented on a change in pull request #9695: URL: https://github.com/apache/kafka/pull/9695#discussion_r553005186
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ########## @@ -97,15 +99,132 @@ public void shouldAddStreamThread() throws Exception { final Optional<String> name = kafkaStreams.addStreamThread(); - assertThat(name, CoreMatchers.not(Optional.empty())); + assertThat(name, not(Optional.empty())); TestUtils.waitForCondition( () -> kafkaStreams.localThreadsMetadata().stream().sequential() .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))), "Wait for the thread to be added" ); assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1)); - assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"})); - TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running"); + assertThat( + kafkaStreams + .localThreadsMetadata() + .stream() + .map(t -> t.threadName().split("-StreamThread-")[1]) + .sorted().toArray(), + equalTo(new String[] {"1", "2", "3"}) + ); + TestUtils.waitForCondition( + () -> kafkaStreams.state() == KafkaStreams.State.RUNNING, + "Kafka Streams client did not reach state RUNNING" + ); + } + } + + @Test + public void shouldRemoveStreamThread() throws Exception { + try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { + StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); + final int oldThreadCount = kafkaStreams.localThreadsMetadata().size(); + assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId)); + assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1)); + } + } + + @Test + public void shouldAddAndRemoveThreads() throws InterruptedException { + try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { + StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); + final int oldThreadCount = kafkaStreams.localThreadsMetadata().size(); + final CountDownLatch latch = new CountDownLatch(2); + final Thread one = adjustCountHelperThread(kafkaStreams, 4, latch); + final Thread two = adjustCountHelperThread(kafkaStreams, 6, latch); + two.start(); + one.start(); + latch.await(30, TimeUnit.SECONDS); + assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount)); + } Review comment: Awesome, thanks for adding this test. One small suggestion would be to wait for the client to get back to RUNNING at the end, so we can verify that everything did go smoothly with the add/remove. I think this would be good to do in all of these tests, actually ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org