ableegoldman commented on a change in pull request #9695: URL: https://github.com/apache/kafka/pull/9695#discussion_r553008933
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ########## @@ -97,15 +99,132 @@ public void shouldAddStreamThread() throws Exception { final Optional<String> name = kafkaStreams.addStreamThread(); - assertThat(name, CoreMatchers.not(Optional.empty())); + assertThat(name, not(Optional.empty())); TestUtils.waitForCondition( () -> kafkaStreams.localThreadsMetadata().stream().sequential() .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))), "Wait for the thread to be added" ); assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1)); - assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"})); - TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running"); + assertThat( + kafkaStreams + .localThreadsMetadata() + .stream() + .map(t -> t.threadName().split("-StreamThread-")[1]) + .sorted().toArray(), + equalTo(new String[] {"1", "2", "3"}) + ); + TestUtils.waitForCondition( + () -> kafkaStreams.state() == KafkaStreams.State.RUNNING, + "Kafka Streams client did not reach state RUNNING" + ); + } + } + + @Test + public void shouldRemoveStreamThread() throws Exception { Review comment: Ah, right. Would it make sense to add this test in the above PR instead? Or something in the middle, you could add the test in this PR and just leave out (1) for now, then add that check in the other PR. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org