vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r453155949



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -648,7 +651,9 @@ void runOnce() {
 
         // only try to initialize the assigned tasks
         // if the state is still in PARTITION_ASSIGNED after the poll call
-        if (state == State.PARTITIONS_ASSIGNED) {
+        if (state == State.PARTITIONS_ASSIGNED
+            || state == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {

Review comment:
       The flaky test was related to us going into this block from other 
states. I finally got a clue when one of the tests failed on "invalid 
transition from PARTITIONS_REVOKED to RUNNING". I'm not sure how, exactly, but 
I think the shutdown test that failed on ConcurrentModificationException was 
also related, probably due to the test invoking the 
handleAssignment/Revocation/Lost methods from a different thread (which can 
normally never happen).
   
   Anyway, my prior code only intended to add the _self_ transition, but failed 
to make sure we were actually in a self-transition. It's fixed now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to