AndrewJSchofield commented on code in PR #21239:
URL: https://github.com/apache/kafka/pull/21239#discussion_r2661347415
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -432,6 +432,23 @@ private void onErrorResponse(final R response, final long
currentTimeMs) {
"subscribe. " + errorMessage));
break;
+ case GROUP_ID_NOT_FOUND:
+ // If the group doesn't exist (e.g., member never joined due
to InvalidTopicException),
+ // GROUP_ID_NOT_FOUND should be ignored - the leave is
effectively complete.
+ // When a leave heartbeat (epoch=-1) is sent, the state
transitions synchronously
+ // from LEAVING to UNSUBSCRIBED in
onHeartbeatRequestGenerated() before the request is sent.
+ if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
+ logger.info("{} received GROUP_ID_NOT_FOUND for group {}
while unsubscribed. " +
+ "Not treating as fatal since consumer is leaving
group.",
+ heartbeatRequestName(),
membershipManager().groupId());
+ membershipManager().onHeartbeatRequestSkipped();
+ } else {
+ // Else, this is a fatal error, we should throw it and
transition to fatal state.
+ logger.error("{} failed due to unexpected error {}: {}",
Review Comment:
nit: The following `logger.error` is identical but on a single line. Please
change this to a single line also.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -432,6 +432,23 @@ private void onErrorResponse(final R response, final long
currentTimeMs) {
"subscribe. " + errorMessage));
break;
+ case GROUP_ID_NOT_FOUND:
+ // If the group doesn't exist (e.g., member never joined due
to InvalidTopicException),
+ // GROUP_ID_NOT_FOUND should be ignored - the leave is
effectively complete.
+ // When a leave heartbeat (epoch=-1) is sent, the state
transitions synchronously
+ // from LEAVING to UNSUBSCRIBED in
onHeartbeatRequestGenerated() before the request is sent.
+ if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
+ logger.info("{} received GROUP_ID_NOT_FOUND for group {}
while unsubscribed. " +
+ "Not treating as fatal since consumer is leaving
group.",
Review Comment:
I would remove the "Not treating as fatal since consumer is leaving group.".
There's no sense in using "fatal" in an information log line. This is nothing
to worry about in the slightest.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -432,6 +432,23 @@ private void onErrorResponse(final R response, final long
currentTimeMs) {
"subscribe. " + errorMessage));
break;
+ case GROUP_ID_NOT_FOUND:
+ // If the group doesn't exist (e.g., member never joined due
to InvalidTopicException),
+ // GROUP_ID_NOT_FOUND should be ignored - the leave is
effectively complete.
+ // When a leave heartbeat (epoch=-1) is sent, the state
transitions synchronously
+ // from LEAVING to UNSUBSCRIBED in
onHeartbeatRequestGenerated() before the request is sent.
+ if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
+ logger.info("{} received GROUP_ID_NOT_FOUND for group {}
while unsubscribed. " +
+ "Not treating as fatal since consumer is leaving
group.",
+ heartbeatRequestName(),
membershipManager().groupId());
+ membershipManager().onHeartbeatRequestSkipped();
+ } else {
+ // Else, this is a fatal error, we should throw it and
transition to fatal state.
+ logger.error("{} failed due to unexpected error {}: {}",
+ heartbeatRequestName(), error, errorMessage);
+ handleFatalFailure(error.exception(errorMessage));
+ }
+ break;
Review Comment:
nit: Let's have a blank line following the break to match the other cases in
this switch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]