wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r540486098



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1142,11 +1128,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {

Review comment:
       We want to make `close()` idempotent and not throw an exception but we 
will log a warning, but only for close so that is why these logs are not in the 
`setState()` method.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -594,9 +581,8 @@ public synchronized void onChange(final Thread thread,
                     if (newState == GlobalStreamThread.State.RUNNING) {
                         maybeSetRunning();
                     } else if (newState == GlobalStreamThread.State.DEAD) {
-                        if (setState(State.ERROR)) {

Review comment:
       we will be doing the same thing but closing the client for now. Maybe a 
replace globalThread will be added later

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -411,7 +405,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
                 errorInjectedClient1.set(false);
                 stateTransitions1.clear();
                 streams1Alpha.close();
-                waitForStateTransition(stateTransitions1, CLOSE_CRASHED);
+                final KafkaStreams finalStreams1Alpha = streams1Alpha;
+                waitForCondition(() -> finalStreams1Alpha.state() == 
State.ERROR, "Stream did not go to ERROR");

Review comment:
       Just wait for ERROR because you don't close crashed

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() {
     }
 
     @Test
-    @SuppressWarnings("deprecation")

Review comment:
       remove deprecation

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -532,22 +537,6 @@ public void setGlobalStateRestoreListener(final 
StateRestoreListener globalState
             this.threadStatesLock = new Object();
         }
 
-        /**
-         * If all threads are dead set to ERROR
-         */
-        private void maybeSetError() {

Review comment:
       This will not be needed with the new error definition

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -546,7 +546,7 @@ public void run() {
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException      if the store's change log does not 
contain the partition
      */
-    void runLoop() {
+    boolean runLoop() {

Review comment:
       This will let Streams shutdown uncleanly when in EOS mode

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -418,70 +418,6 @@ public void 
stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In
         Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
     }
 
-    @Test

Review comment:
       This test is for the functionality we are removing

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -524,9 +460,6 @@ public void testStateThreadClose() throws Exception {
                     "Thread never stopped.");
                 streams.threads.get(i).join();
             }
-            TestUtils.waitForCondition(

Review comment:
       There is no transition to Error here anymore

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final 
String stateDirPath,
     }
 
     @Test
-    @Deprecated

Review comment:
       remove deprecated handler

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -139,7 +139,7 @@ public void shouldShutdownClient() throws 
InterruptedException {
             
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
             produceMessages(0L, inputTopic, "A");
-            waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+            waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, DEFAULT_DURATION);

Review comment:
       When the client is shutdown it now goes to ERROR

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -625,8 +561,7 @@ public void shouldNotAddThreadWhenError() {
         final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
         final int oldSize = streams.threads.size();
         streams.start();
-        streamThreadOne.shutdown();
-        streamThreadTwo.shutdown();
+        globalStreamThread.shutdown();

Review comment:
       removing stream threads does not put the client in error anymore. The 
global does

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final 
String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() 
throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       There was a problem with the cleaner thread sometimes wiping out old 
segments because they were expired

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
 
                 
assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
 
-                waitForStateTransition(stateTransitions2, CRASH);
-
                 commitErrorInjectedClient2.set(false);
                 stateTransitions2.clear();
                 streams2Alpha.close();
-                waitForStateTransition(stateTransitions2, CLOSE_CRASHED);

Review comment:
       No longer do you need to close crashed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to