lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1331809403


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -115,31 +115,34 @@ public int memberEpoch() {
 
     @Override
     public void updateState(ConsumerGroupHeartbeatResponseData response) {
-        if (response.errorCode() == Errors.NONE.code()) {
-            this.memberId = response.memberId();
-            this.memberEpoch = response.memberEpoch();
-            ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
-            if (assignment != null) {
-                setTargetAssignment(assignment);
-            }
-            maybeTransitionToStable();
-        } else {
-            if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || 
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
-                resetEpoch();
-                transitionTo(MemberState.FENCED);
-            } else if (response.errorCode() == 
Errors.UNRELEASED_INSTANCE_ID.code()) {
-                transitionTo(MemberState.FAILED);
-            }
-            // TODO: handle other errors here to update state accordingly, 
mainly making the
-            //  distinction between the recoverable errors and the fatal ones, 
that should FAILED
-            //  the member
+        this.memberId = response.memberId();
+        this.memberEpoch = response.memberEpoch();
+        ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+        if (assignment != null) {
+            setTargetAssignment(assignment);
         }
+        maybeTransitionToStable();
     }
 
     @Override
-    public boolean notInGroup() {
-        return state() == MemberState.UNJOINED ||
-            state() == MemberState.FAILED;
+    public void onFatalError(final short errorCode) {

Review Comment:
   This `onFatalError` does update the state for the member, so separating it 
from the `updateState` leads to having the update logic and transitions in 2 
places (which I think is harder to follow/troubleshoot). 
   
   What about we go back to a single `updateState` responsible for updating 
state (aka. member info and transitions) . And if we make this single 
`updateState` return the Optional<Error> that it may find in the response, then 
we could leave the error handling only in the MembershipManager, and the 
HeartbeatManager could be much simplified. Take a look at 
[this](https://github.com/apache/kafka/pull/14413) draft PR and let me know 
your thoughts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to