wcarlson5 commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593517811



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -87,7 +87,9 @@ public void onPartitionsRevoked(final 
Collection<TopicPartition> partitions) {
                   taskManager.activeTaskIds(),
                   taskManager.standbyTaskIds());
 
-        if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
!partitions.isEmpty()) {
+        // We need to still invoke handleRevocation if the thread has been 
told to shut down, but we shouldn't ever
+        // transition away from PENDING_SHUTDOWN once it's been initiated (to 
anything other than DEAD)
+        if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || 
streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {

Review comment:
       do we need to be concerned about the oder these execute?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -714,6 +714,13 @@ void runOnce() {
 
         final long pollLatency = pollPhase();
 
+        // Optimization to skip the rest of the processing loop in case the 
thread was requested to shut down during
+        // the poll phase

Review comment:
       Good idea but I think we do this a few lines down




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to