kirktrue commented on code in PR #16569:
URL: https://github.com/apache/kafka/pull/16569#discussion_r1676322238


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -587,6 +617,14 @@ public void transitionToFatal() {
             return;
         }
 
+        if (previousState == MemberState.LEAVING || previousState == 
MemberState.PREPARE_LEAVING) {
+            log.info("Member {} with epoch {} was leaving the group with state 
{} when it got a " +
+                "fatal error from the broker. It will discard the ongoing 
leave and remain in {} " +
+                "state.", memberId, memberEpoch, previousState, state);
+            maybeCompleteLeaveInProgress();

Review Comment:
   Per the above comment, there's no immediate checks around this call to 
`maybeCompleteLeaveInProgress()` that the state is `UNSUBSCRIBED`. From my 
reading of it, line 606 sets the state to `FATAL`.
   
   Again, not trying to nitpick here, I promise 😄 I'm just trying to understand 
the code and the expected state(s) when calling 
`maybeCompleteLeaveInProgress()`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##########
@@ -78,14 +78,14 @@ public interface MembershipManager extends RequestManager {
     /**
      * Notify the member that an error heartbeat response was received.
      */
-    void onHeartbeatFailure();
+    void onHeartbeatFailure(boolean retriable);

Review Comment:
   From my understanding of the code, the `retriable` flag here only affects 
whether or not we update metrics. Is that correct? I'm not suggesting we change 
the flag name, it's just a little confusing (to me) from an API standpoint. 
Perhaps some JavaDoc to explain what behavior change that flag affects?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -459,8 +464,30 @@ public void 
onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response) {
     }
 
     @Override
-    public void onHeartbeatFailure() {
-        metricsManager.maybeRecordRebalanceFailed();
+    public void onHeartbeatFailure(boolean retriable) {
+        if (!retriable) {
+            metricsManager.maybeRecordRebalanceFailed();
+        }
+        // The leave group request is sent out once (not retried), so we 
should complete the leave
+        // operation once the request completes, regardless of the response.
+        if (state == MemberState.UNSUBSCRIBED && 
maybeCompleteLeaveInProgress()) {
+            log.warn("Member {} with epoch {} received a failed response to 
the heartbeat to " +
+                "leave the group and completed the leave operation. ", 
memberId, memberEpoch);
+        }
+    }
+
+    /**
+     * Complete the leave in progress (if any), if the member is UNSUBSCRIBED. 
This is expected to

Review Comment:
   Not to nitpick, but technically the `maybeCompleteLeaveInProgress()` doesn't 
check/enforce anything to do with the current `MemberState`. Should there be a 
quick sanity check that the state is `UNSUBSCRIBED` inside 
`maybeCompleteLeaveInProgress()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to