[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593521930



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   Hmm...but `resetStateAndRejoin(String.format("rebalance failed with 
retriable error %s", exception));` is only called in `joinGroupIfNeeded`  which 
is only called in `ensureActiveGroup`, which is in turn only invoked in 
`ConsumerCoordinator#poll`.
   That said,  inside `SyncGroupResponseHandler#handle` we would already have 
`rejoinNeeded = true` and only set it to false if the SyncGroup succeeds. So 
for that reason I guess we don't need the `requestRejoin` anywhere inside the 
SyncGroup handler





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593510070



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   Ok cool, thanks. One last question then: after this refactoring, since 
we no longer call `requestRejoinOnResponseError` below, should we re-add the 
`requestRejoin()` call here? Or add a `requestRejoin` to the specific cases in 
the SyncGroup handler, eg
   ```
   } else if (error == Errors.REBALANCE_IN_PROGRESS) {
   log.info("SyncGroup failed: The group began another rebalance. Need to 
re-join the group. " +
 "Sent generation was {}", sentGeneration);
   future.raise(error);
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593463366



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   @guozhangwang I think something may have been messed up during a 
merge/rebase: I no longer see `requestRejoinOnResponseError` being invoked 
anywhere





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-01 Thread GitBox


ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r585070038



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   Just to clarify, you mean we don't need to rejoin here since we will 
always raise an error, and always rejoin (if necessary) when checking that 
error?
   
   Or are you referring to the `requestRejoinOnResponseError` calls you added 
to the two last cases in the below if/else? 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -961,29 +962,34 @@ protected synchronized String memberId() {
 return generation.memberId;
 }
 
-private synchronized void resetState() {
+private synchronized void resetState(final String reason) {

Review comment:
   nit: rename to `resetStateAndGeneration`?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -802,8 +801,10 @@ public void handle(SyncGroupResponse syncResponse,
 log.info("SyncGroup failed: {} Marking coordinator 
unknown. Sent generation was {}",
  error.message(), sentGeneration);
 markCoordinatorUnknown(error);
+requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error);

Review comment:
   Why do we explicitly rejoin in this case, but not eg 
`REBALANCE_IN_PROGRESS`? or `UNKNOWN_MEMBER_ID`/`ILLEGAL_GENERATION` ?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -961,29 +962,34 @@ protected synchronized String memberId() {
 return generation.memberId;
 }
 
-private synchronized void resetState() {
+private synchronized void resetState(final String reason) {
+log.info("Resetting generation due to: {}", reason);
+
 state = MemberState.UNJOINED;
 generation = Generation.NO_GENERATION;
 }
 
-private synchronized void resetStateAndRejoin() {
-resetState();
-rejoinNeeded = true;
+private synchronized void resetStateAndRejoin(final String reason) {
+resetState(reason);
+requestRejoin(reason);
 }
 
 synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
-log.debug("Resetting generation after encountering {} from {} response 
and requesting re-join", error, api);

Review comment:
   SGTM. If we find it flooding the logs and not helpful we can reconsider





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org