wcarlson5 commented on a change in pull request #9720: URL: https://github.com/apache/kafka/pull/9720#discussion_r551458558
########## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ########## @@ -524,9 +460,6 @@ public void testStateThreadClose() throws Exception { "Thread never stopped."); streams.threads.get(i).join(); } - TestUtils.waitForCondition( Review comment: thats probably a good idea ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForStateTransition(stateTransitions2, CRASH); - commitErrorInjectedClient2.set(false); stateTransitions2.clear(); streams2Alpha.close(); - waitForStateTransition(stateTransitions2, CLOSE_CRASHED); Review comment: My thoughts where that we are calling close and so its is going to just be closed as the state transitions are cleared. This is the test that made me want you to review the PR I was worried that I might have broken the integrity of the tests as it is not behaving as I expected. However no other test was having this problem so I was not sure if I was understanding it correctly. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java ########## @@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() { } @Test - @SuppressWarnings("deprecation") Review comment: It made it so that the new default was not used until we updated the Error transition as we are doing in this PR. :) ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) { metrics.close(); if (!error) { setState(State.NOT_RUNNING); + } else { + setState(State.ERROR); } }, "kafka-streams-close-thread"); } private boolean close(final long timeoutMs) { + if (state == State.ERROR) { + log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); + return false; Review comment: I don't think that we should return true. A user would then expect that that the state would then change to `NOT_RUNNING` and could be stuck waiting on that. Where if we return false in then they might retry and get stuck there. I don't know which is a better problem to have but I think that changing the meaning of this return value won't add any more clarity to the situation. ########## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ########## @@ -557,12 +490,12 @@ public void testStateGlobalThreadClose() throws Exception { () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, "Thread never stopped."); globalStreamThread.join(); - assertEquals(streams.state(), KafkaStreams.State.ERROR); + assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR); } finally { streams.close(); } - assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); + assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR); Review comment: It will. We don't strictly need to test it here as that is tested elsewhere but we can add it for clarity ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -226,7 +226,8 @@ RUNNING(1, 2, 3, 5), // 2 PENDING_SHUTDOWN(4), // 3 NOT_RUNNING, // 4 - ERROR(3); // 5 + PENDING_ERROR(6), // 5 + ERROR; // 6 Review comment: yay ascii art! good catch I forgot about that When I build the docs locally it looks about right  ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java ########## @@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, } @Test - @Deprecated public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception { + final long time = System.currentTimeMillis(); Review comment: My theory (based on logs) is that the cleaner thread was sometimes activating and removing the segment before it should so the record that makes the thread crash was not being consumed by the recovery thread sometimes. I just changed the time stamps so that the cleaner thread would not find them old and if it did activate it would not clean them. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java ########## @@ -128,8 +128,8 @@ public void shouldShutdownThreadUsingOldHandler() throws InterruptedException { TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was called 1st time"); // should call the UncaughtExceptionHandler after rebalancing to another thread TestUtils.waitForCondition(() -> counter.get() == 2, DEFAULT_DURATION.toMillis(), "Handler was called 2nd time"); - // the stream should now turn into ERROR state after 2 threads are dead - waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); + // there is no threads running but the client is still in running + waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); Review comment: We can but we should be able to see that with `counter` that the 2 threads already failed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org