[GitHub] [kafka] ableegoldman commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593523423



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##
@@ -87,7 +87,9 @@ public void onPartitionsRevoked(final 
Collection partitions) {
   taskManager.activeTaskIds(),
   taskManager.standbyTaskIds());
 
-if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
!partitions.isEmpty()) {
+// We need to still invoke handleRevocation if the thread has been 
told to shut down, but we shouldn't ever
+// transition away from PENDING_SHUTDOWN once it's been initiated (to 
anything other than DEAD)
+if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || 
streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {

Review comment:
   I think this is the correct order (assuming you mean the order of 
`streamThread.setState(State.PARTITIONS_REVOKED) != null` relative to 
`streamThread.state() == State.PENDING_SHUTDOWN`?) -- if the thread is not in 
PENDING_SHUTDOWN when it reaches this line, the first condition should return 
true, which is what we want even if it does get transitioned to 
PENDING_SHUTDOWN immediately after the transition to PARTITIONS_REVOKED.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593522324



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -714,6 +714,13 @@ void runOnce() {
 
 final long pollLatency = pollPhase();
 
+// Optimization to skip the rest of the processing loop in case the 
thread was requested to shut down during
+// the poll phase

Review comment:
   臘‍♀️  Oh wow how did I not see that lol. I'll just bump the log to INFO





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org