wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r551458558



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -524,9 +460,6 @@ public void testStateThreadClose() throws Exception {
                     "Thread never stopped.");
                 streams.threads.get(i).join();
             }
-            TestUtils.waitForCondition(

Review comment:
       thats probably a good idea

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
 
                 
assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
 
-                waitForStateTransition(stateTransitions2, CRASH);
-
                 commitErrorInjectedClient2.set(false);
                 stateTransitions2.clear();
                 streams2Alpha.close();
-                waitForStateTransition(stateTransitions2, CLOSE_CRASHED);

Review comment:
       My thoughts where that we are calling close and so its is going to just 
be closed as the state transitions are cleared. This is the test that made me 
want you to review the PR I was worried that I might have broken the integrity 
of the tests as it is not behaving as I expected. However no other test was 
having this problem so I was not sure if I was understanding it correctly. 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() {
     }
 
     @Test
-    @SuppressWarnings("deprecation")

Review comment:
       It made it so that the new default was not used until we updated the 
Error transition as we are doing in this PR. :) 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {
+            log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+            return false;

Review comment:
       I don't think that we should return true. A user would then expect that 
that the state would then change to `NOT_RUNNING` and could be stuck waiting on 
that. Where if we return false in then they might retry and get stuck there.  
   
   I don't know which is a better problem to have but I think that changing the 
meaning of this return value won't add any more clarity to the situation. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -557,12 +490,12 @@ public void testStateGlobalThreadClose() throws Exception 
{
                 () -> globalStreamThread.state() == 
GlobalStreamThread.State.DEAD,
                 "Thread never stopped.");
             globalStreamThread.join();
-            assertEquals(streams.state(), KafkaStreams.State.ERROR);
+            assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR);
         } finally {
             streams.close();
         }
 
-        assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+        assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR);

Review comment:
       It will. We don't strictly need to test it here as that is tested 
elsewhere but we can add it for clarity

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -226,7 +226,8 @@
         RUNNING(1, 2, 3, 5),    // 2
         PENDING_SHUTDOWN(4),    // 3
         NOT_RUNNING,            // 4
-        ERROR(3);               // 5
+        PENDING_ERROR(6),       // 5
+        ERROR;               // 6

Review comment:
       yay ascii art! good catch I forgot about that
   
   When I build the docs locally it looks about right
   
   
![image](https://user-images.githubusercontent.com/18128741/103559186-9815d680-4e6a-11eb-8e4f-3db522102ff6.png)
   

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final 
String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() 
throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       My theory (based on logs) is that the cleaner thread was sometimes 
activating and removing the segment before it should so the record that makes 
the thread crash was not being consumed by the recovery thread sometimes. I 
just changed the time stamps so that the cleaner thread would not find them old 
and if it did activate it would not clean them.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -128,8 +128,8 @@ public void shouldShutdownThreadUsingOldHandler() throws 
InterruptedException {
             TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was 
called 1st time");
             // should call the UncaughtExceptionHandler after rebalancing to 
another thread
             TestUtils.waitForCondition(() -> counter.get() == 2, 
DEFAULT_DURATION.toMillis(), "Handler was called 2nd time");
-            // the stream should now turn into ERROR state after 2 threads are 
dead
-            waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, DEFAULT_DURATION);
+            // there is no threads running but the client is still in running
+            waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.RUNNING, DEFAULT_DURATION);

Review comment:
       We can but we should be able to see that with `counter` that the 2 
threads already failed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to