ableegoldman commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593523423



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -87,7 +87,9 @@ public void onPartitionsRevoked(final 
Collection<TopicPartition> partitions) {
                   taskManager.activeTaskIds(),
                   taskManager.standbyTaskIds());
 
-        if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
!partitions.isEmpty()) {
+        // We need to still invoke handleRevocation if the thread has been 
told to shut down, but we shouldn't ever
+        // transition away from PENDING_SHUTDOWN once it's been initiated (to 
anything other than DEAD)
+        if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || 
streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {

Review comment:
       I think this is the correct order (assuming you mean the order of 
`streamThread.setState(State.PARTITIONS_REVOKED) != null` relative to 
`streamThread.state() == State.PENDING_SHUTDOWN`?) -- if the thread is not in 
PENDING_SHUTDOWN when it reaches this line, the first condition should return 
true, which is what we want even if it does get transitioned to 
PENDING_SHUTDOWN immediately after the transition to PARTITIONS_REVOKED.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to