mjsax commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1260139834


##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws 
Exception {
             }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
 
             // Kill instance, delete state to force restoration.
-            assertThat("Streams instance did not close within timeout", 
streams.close(Duration.ofSeconds(60)));
+            assertThat("Streams instance did not close within timeout", 
streams.get().close(Duration.ofSeconds(60)));
             IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
             Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
                 .map(Path::toFile)
                 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " 
could not be deleted"));
+        } finally {
+            streams.get().close();

Review Comment:
   Not sure why we need `AtomicReference`? Can you elaborate?
   
   Why does this not work?
   ```
   KafkaStream streams;
   try {
     stream = new KafkaStreams(...);
     ...
   ```
   
   Btw: `streams.get()` could return `null` in case we fail before calling 
`set(...)`. Need a `null`-check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to