[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-02-28 Thread GitBox


guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r584479103



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -961,29 +962,34 @@ protected synchronized String memberId() {
 return generation.memberId;
 }
 
-private synchronized void resetState() {
+private synchronized void resetState(final String reason) {
+log.info("Resetting generation due to: {}", reason);
+
 state = MemberState.UNJOINED;
 generation = Generation.NO_GENERATION;
 }
 
-private synchronized void resetStateAndRejoin() {
-resetState();
-rejoinNeeded = true;
+private synchronized void resetStateAndRejoin(final String reason) {
+resetState(reason);
+requestRejoin(reason);
 }
 
 synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
-log.debug("Resetting generation after encountering {} from {} response 
and requesting re-join", error, api);

Review comment:
   Note that I intentionally bumped up the log level from debug to info 
here since I think this is necessarily a message that users should pay 
attention to in production, where they mostly use INFO. Open for counter 
suggestions though.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   We can remove this since it is a bit redundant now as we call for each 
case if necessary.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -860,7 +861,7 @@ public void onSuccess(ClientResponse resp, 
RequestFuture future) {
 
 @Override
 public void onFailure(RuntimeException e, RequestFuture future) {
-log.debug("FindCoordinator request failed due to {}", e);
+log.debug("FindCoordinator request failed due to {}", 
e.toString());

Review comment:
   Minor cleanup, we only need to print the error message but not the stack 
trace.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-05 Thread GitBox


guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r588842618



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   I meant the latter: we call that inside the conditions already -- for 
those fatal errors, we do not need to call this anyways since the consumer will 
throw and crash.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-05 Thread GitBox


guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r588847540



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -802,8 +801,10 @@ public void handle(SyncGroupResponse syncResponse,
 log.info("SyncGroup failed: {} Marking coordinator 
unknown. Sent generation was {}",
  error.message(), sentGeneration);
 markCoordinatorUnknown(error);
+requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error);

Review comment:
   You're right, we do not, I've updated this section.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593504141



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   I added that function for sync group handler that handles retriable 
`COORDINATOR_NOT_AVAILABLE / NOT_COORDINATOR` and any unexpected error. After 
the refactoring PR they are not all fall into the `joinGroupIfNeeded` in
   
   ```
   final RuntimeException exception = future.exception();
   
   resetJoinGroupFuture();
   
   if (exception instanceof UnknownMemberIdException ||
   exception instanceof IllegalGenerationException ||
   exception instanceof RebalanceInProgressException ||
   exception instanceof MemberIdRequiredException)
   continue;
   else if (!future.isRetriable())
   throw exception;
   
   resetStateAndRejoin(String.format("rebalance failed with 
retriable error %s", exception));
   timer.sleep(rebalanceConfig.retryBackoffMs);
   ```
   
   This is part of the principle I mentioned:
   
   ```
   We may reset generationa and request rejoin in two different places: 1) in 
join/sync-group handler, and 2) in joinGroupIfNeeded, when the future is 
received. The principle is that these two should not overlap, and 2) is used as 
a fallback for those common errors from join/sync that we do not handle 
specifically.
   ```
   
   But I forgot to remove this function as part of the second pass; will remove.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593515777



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   I think we do not need to, since it would be called on 
`resetStateAndRejoin(String.format("rebalance failed with retriable error %s", 
exception));` --- previously we are calling rejoin double times.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org