mjsax commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1260139834
########## streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java: ########## @@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws Exception { }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag."); // Kill instance, delete state to force restoration. - assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60))); + assertThat("Streams instance did not close within timeout", streams.get().close(Duration.ofSeconds(60))); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder()) .map(Path::toFile) .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted")); + } finally { + streams.get().close(); Review Comment: Not sure why we need `AtomicReference`? Can you elaborate? Why does this not work? ``` KafkaStream streams; try { stream = new KafkaStreams(...); ... ``` Btw: `streams.get()` could return `null` in case we fail before calling `set(...)`. Need a `null`-check here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org