showuon commented on pull request #9733: URL: https://github.com/apache/kafka/pull/9733#issuecomment-749913992
@mjsax , thanks for your comments. Let me answer your questions below. > You originally observed a test failure based on missing retries when trying to get the state store. This should be fixed already. --> Right! The failure when trying to get state store is fixed now. > did you observer more test failures that indicate that we need the proposed changes? --> Yes, I observe test failures on your PR to 2.6 (i.e. https://github.com/apache/kafka/pull/9690#issuecomment-747243284), the this test failure should also happen in `trunk` branch, which is caused by the stream is still under **unstable** assignment, not stable yet. > For this case, we know that the state must be in rebalancing (because Kafka Streams would transit to REBALACING before the StableAssignmentListener is called) --> No, that's not what I observed. What I observed is that the `StableAssignmentListener` only completes the assignment, and the streams haven't handled the assignment yet. Please check the following logs: ``` 2020-12-23T11:35:01.361+0800 [DEBUG] [TestEventLogger] !!! onAssignmentComplete ... 2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger] [2020-12-23 11:35:19,703] INFO stream-thread [appDir2-StreamThread-1] Handle new assignment with: 2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger] New active tasks: [0_0, 0_2] 2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger] New standby tasks: [] 2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger] Existing active tasks: [] 2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger] Existing standby tasks: [0_0, 0_2] (org.apache.kafka.streams.processor.internals.TaskManager:255) 2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger] [2020-12-23 11:35:19,703] INFO stream-thread [appDir1-StreamThread-1] Handle new assignment with: 2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger] New active tasks: [0_1, 0_3] 2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger] New standby tasks: [] 2020-12-23T11:35:19.704+0800 [DEBUG] [TestEventLogger] Existing active tasks: [0_1, 0_3] 2020-12-23T11:35:19.704+0800 [DEBUG] [TestEventLogger] Existing standby tasks: [] (org.apache.kafka.streams.processor.internals.TaskManager:255) ... 2020-12-23T11:35:19.704+0800 [DEBUG] [TestEventLogger] [2020-12-23 11:35:19,703] INFO stream-thread [appDir1-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread:224) 2020-12-23T11:35:19.704+0800 [DEBUG] [TestEventLogger] [2020-12-23 11:35:19,703] INFO stream-client [appDir1] State transition from RUNNING to REBALANCING (org.apache.kafka.streams.KafkaStreams:298) ... 2020-12-23T11:35:19.711+0800 [DEBUG] [TestEventLogger] [2020-12-23 11:35:19,711] INFO stream-thread [appDir2-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread:224) 2020-12-23T11:35:19.711+0800 [DEBUG] [TestEventLogger] [2020-12-23 11:35:19,711] INFO stream-client [appDir2] State transition from RUNNING to REBALANCING (org.apache.kafka.streams.KafkaStreams:298) ... 2020-12-23T11:35:19.776+0800 [DEBUG] [TestEventLogger] [2020-12-23 11:35:19,776] INFO stream-thread [appDir1-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread:224) 2020-12-23T11:35:19.776+0800 [DEBUG] [TestEventLogger] [2020-12-23 11:35:19,776] INFO stream-client [appDir1] State transition from REBALANCING to RUNNING (org.apache.kafka.streams.KafkaStreams:298) ... 2020-12-23T11:35:20.028+0800 [DEBUG] [TestEventLogger] [2020-12-23 11:35:20,028] INFO stream-thread [appDir2-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread:224) 2020-12-23T11:35:20.028+0800 [DEBUG] [TestEventLogger] [2020-12-23 11:35:20,028] INFO stream-client [appDir2] State transition from REBALANCING to RUNNING (org.apache.kafka.streams.KafkaStreams:298) ``` The line `!!! onAssignmentComplete` is the callback we got on onAssignmentComplete. Then, the stream thread **handle the new assignments**, next, it'll do the state transition (change to REBALANCING) for each stream, and the last, will be in RUNNING state after rebalancing completes. And, that's the reason why the PR to 2.6 #9690 keeps encountering test failures, because it's we're still under "unstable" assignment (that is, just complete the `onAssignmentComplete`, not entering stable rebalancing stage yet). And in 2.6, the unstable assignment will sometimes be empty assignmet, that's why the test is flaky. Though in 2.7 and later, we don't have empty in unstable assignment, we still might still not be in **stable** RUNNING state, and the assignment might be changed after stable assignment. And that's why I think we should fix this issue, too. Thank you. Happy Holidays~ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org