wcarlson5 commented on a change in pull request #9720: URL: https://github.com/apache/kafka/pull/9720#discussion_r540486098
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1142,11 +1128,21 @@ private Thread shutdownHelper(final boolean error) { metrics.close(); if (!error) { setState(State.NOT_RUNNING); + } else { + setState(State.ERROR); } }, "kafka-streams-close-thread"); } private boolean close(final long timeoutMs) { + if (state == State.ERROR) { Review comment: We want to make `close()` idempotent and not throw an exception but we will log a warning, but only for close so that is why these logs are not in the `setState()` method. ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -594,9 +581,8 @@ public synchronized void onChange(final Thread thread, if (newState == GlobalStreamThread.State.RUNNING) { maybeSetRunning(); } else if (newState == GlobalStreamThread.State.DEAD) { - if (setState(State.ERROR)) { Review comment: we will be doing the same thing but closing the client for now. Maybe a replace globalThread will be added later ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -411,7 +405,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { errorInjectedClient1.set(false); stateTransitions1.clear(); streams1Alpha.close(); - waitForStateTransition(stateTransitions1, CLOSE_CRASHED); + final KafkaStreams finalStreams1Alpha = streams1Alpha; + waitForCondition(() -> finalStreams1Alpha.state() == State.ERROR, "Stream did not go to ERROR"); Review comment: Just wait for ERROR because you don't close crashed ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java ########## @@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() { } @Test - @SuppressWarnings("deprecation") Review comment: remove deprecation ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -532,22 +537,6 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState this.threadStatesLock = new Object(); } - /** - * If all threads are dead set to ERROR - */ - private void maybeSetError() { Review comment: This will not be needed with the new error definition ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -546,7 +546,7 @@ public void run() { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - void runLoop() { + boolean runLoop() { Review comment: This will let Streams shutdown uncleanly when in EOS mode ########## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ########## @@ -418,70 +418,6 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); } - @Test Review comment: This test is for the functionality we are removing ########## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ########## @@ -524,9 +460,6 @@ public void testStateThreadClose() throws Exception { "Thread never stopped."); streams.threads.get(i).join(); } - TestUtils.waitForCondition( Review comment: There is no transition to Error here anymore ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java ########## @@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, } @Test - @Deprecated Review comment: remove deprecated handler ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java ########## @@ -139,7 +139,7 @@ public void shouldShutdownClient() throws InterruptedException { StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); produceMessages(0L, inputTopic, "A"); - waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); + waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); Review comment: When the client is shutdown it now goes to ERROR ########## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ########## @@ -625,8 +561,7 @@ public void shouldNotAddThreadWhenError() { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); final int oldSize = streams.threads.size(); streams.start(); - streamThreadOne.shutdown(); - streamThreadTwo.shutdown(); + globalStreamThread.shutdown(); Review comment: removing stream threads does not put the client in error anymore. The global does ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java ########## @@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, } @Test - @Deprecated public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception { + final long time = System.currentTimeMillis(); Review comment: There was a problem with the cleaner thread sometimes wiping out old segments because they were expired ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForStateTransition(stateTransitions2, CRASH); - commitErrorInjectedClient2.set(false); stateTransitions2.clear(); streams2Alpha.close(); - waitForStateTransition(stateTransitions2, CLOSE_CRASHED); Review comment: No longer do you need to close crashed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org