[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
ableegoldman commented on a change in pull request #10232: URL: https://github.com/apache/kafka/pull/10232#discussion_r585070038 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { -requestRejoin(); Review comment: Just to clarify, you mean we don't need to rejoin here since we will always raise an error, and always rejoin (if necessary) when checking that error? Or are you referring to the `requestRejoinOnResponseError` calls you added to the two last cases in the below if/else? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -961,29 +962,34 @@ protected synchronized String memberId() { return generation.memberId; } -private synchronized void resetState() { +private synchronized void resetState(final String reason) { Review comment: nit: rename to `resetStateAndGeneration`? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -802,8 +801,10 @@ public void handle(SyncGroupResponse syncResponse, log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}", error.message(), sentGeneration); markCoordinatorUnknown(error); +requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error); Review comment: Why do we explicitly rejoin in this case, but not eg `REBALANCE_IN_PROGRESS`? or `UNKNOWN_MEMBER_ID`/`ILLEGAL_GENERATION` ? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -961,29 +962,34 @@ protected synchronized String memberId() { return generation.memberId; } -private synchronized void resetState() { +private synchronized void resetState(final String reason) { +log.info("Resetting generation due to: {}", reason); + state = MemberState.UNJOINED; generation = Generation.NO_GENERATION; } -private synchronized void resetStateAndRejoin() { -resetState(); -rejoinNeeded = true; +private synchronized void resetStateAndRejoin(final String reason) { +resetState(reason); +requestRejoin(reason); } synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) { -log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api); Review comment: SGTM. If we find it flooding the logs and not helpful we can reconsider This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
ableegoldman commented on a change in pull request #10232: URL: https://github.com/apache/kafka/pull/10232#discussion_r593463366 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { -requestRejoin(); Review comment: @guozhangwang I think something may have been messed up during a merge/rebase: I no longer see `requestRejoinOnResponseError` being invoked anywhere This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
ableegoldman commented on a change in pull request #10232: URL: https://github.com/apache/kafka/pull/10232#discussion_r593510070 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { -requestRejoin(); Review comment: Ok cool, thanks. One last question then: after this refactoring, since we no longer call `requestRejoinOnResponseError` below, should we re-add the `requestRejoin()` call here? Or add a `requestRejoin` to the specific cases in the SyncGroup handler, eg ``` } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " + "Sent generation was {}", sentGeneration); future.raise(error); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
ableegoldman commented on a change in pull request #10232: URL: https://github.com/apache/kafka/pull/10232#discussion_r593521930 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { -requestRejoin(); Review comment: Hmm...but `resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception));` is only called in `joinGroupIfNeeded` which is only called in `ensureActiveGroup`, which is in turn only invoked in `ConsumerCoordinator#poll`. That said, inside `SyncGroupResponseHandler#handle` we would already have `rejoinNeeded = true` and only set it to false if the SyncGroup succeeds. So for that reason I guess we don't need the `requestRejoin` anywhere inside the SyncGroup handler This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org