ableegoldman commented on a change in pull request #9695: URL: https://github.com/apache/kafka/pull/9695#discussion_r553006191
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ########## @@ -97,15 +99,132 @@ public void shouldAddStreamThread() throws Exception { final Optional<String> name = kafkaStreams.addStreamThread(); - assertThat(name, CoreMatchers.not(Optional.empty())); + assertThat(name, not(Optional.empty())); TestUtils.waitForCondition( () -> kafkaStreams.localThreadsMetadata().stream().sequential() .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))), "Wait for the thread to be added" ); assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1)); - assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"})); - TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running"); + assertThat( + kafkaStreams + .localThreadsMetadata() + .stream() + .map(t -> t.threadName().split("-StreamThread-")[1]) + .sorted().toArray(), + equalTo(new String[] {"1", "2", "3"}) + ); + TestUtils.waitForCondition( + () -> kafkaStreams.state() == KafkaStreams.State.RUNNING, + "Kafka Streams client did not reach state RUNNING" + ); + } + } + + @Test + public void shouldRemoveStreamThread() throws Exception { Review comment: One more corner case we should add test coverage for is what happens when we get down to 0 threads. Two things to verify that I can think of are: (1) that the client stays in RUNNING, and (2) that we can add more threads again after we've been idling with zero threads for a short while (and as always, that it goes into REBALANCING --> RUNNING after adding the threads) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org