AndrewJSchofield commented on code in PR #21239:
URL: https://github.com/apache/kafka/pull/21239#discussion_r2661347415


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -432,6 +432,23 @@ private void onErrorResponse(final R response, final long 
currentTimeMs) {
                     "subscribe. " + errorMessage));
                 break;
 
+            case GROUP_ID_NOT_FOUND:
+                // If the group doesn't exist (e.g., member never joined due 
to InvalidTopicException),
+                // GROUP_ID_NOT_FOUND should be ignored - the leave is 
effectively complete.
+                // When a leave heartbeat (epoch=-1) is sent, the state 
transitions synchronously
+                // from LEAVING to UNSUBSCRIBED in 
onHeartbeatRequestGenerated() before the request is sent.
+                if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
+                    logger.info("{} received GROUP_ID_NOT_FOUND for group {} 
while unsubscribed. " +
+                            "Not treating as fatal since consumer is leaving 
group.",
+                            heartbeatRequestName(), 
membershipManager().groupId());
+                    membershipManager().onHeartbeatRequestSkipped();
+                } else {
+                    // Else, this is a fatal error, we should throw it and 
transition to fatal state.
+                    logger.error("{} failed due to unexpected error {}: {}",

Review Comment:
   nit: The following `logger.error` is identical but on a single line. Please 
change this to a single line also.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -432,6 +432,23 @@ private void onErrorResponse(final R response, final long 
currentTimeMs) {
                     "subscribe. " + errorMessage));
                 break;
 
+            case GROUP_ID_NOT_FOUND:
+                // If the group doesn't exist (e.g., member never joined due 
to InvalidTopicException),
+                // GROUP_ID_NOT_FOUND should be ignored - the leave is 
effectively complete.
+                // When a leave heartbeat (epoch=-1) is sent, the state 
transitions synchronously
+                // from LEAVING to UNSUBSCRIBED in 
onHeartbeatRequestGenerated() before the request is sent.
+                if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
+                    logger.info("{} received GROUP_ID_NOT_FOUND for group {} 
while unsubscribed. " +
+                            "Not treating as fatal since consumer is leaving 
group.",

Review Comment:
   I would remove the "Not treating as fatal since consumer is leaving group.". 
There's no sense in using "fatal" in an information log line. This is nothing 
to worry about in the slightest.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -432,6 +432,23 @@ private void onErrorResponse(final R response, final long 
currentTimeMs) {
                     "subscribe. " + errorMessage));
                 break;
 
+            case GROUP_ID_NOT_FOUND:
+                // If the group doesn't exist (e.g., member never joined due 
to InvalidTopicException),
+                // GROUP_ID_NOT_FOUND should be ignored - the leave is 
effectively complete.
+                // When a leave heartbeat (epoch=-1) is sent, the state 
transitions synchronously
+                // from LEAVING to UNSUBSCRIBED in 
onHeartbeatRequestGenerated() before the request is sent.
+                if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
+                    logger.info("{} received GROUP_ID_NOT_FOUND for group {} 
while unsubscribed. " +
+                            "Not treating as fatal since consumer is leaving 
group.",
+                            heartbeatRequestName(), 
membershipManager().groupId());
+                    membershipManager().onHeartbeatRequestSkipped();
+                } else {
+                    // Else, this is a fatal error, we should throw it and 
transition to fatal state.
+                    logger.error("{} failed due to unexpected error {}: {}",
+                            heartbeatRequestName(), error, errorMessage);
+                    handleFatalFailure(error.exception(errorMessage));
+                }
+                break;

Review Comment:
   nit: Let's have a blank line following the break to match the other cases in 
this switch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to