guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r480425820



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() {
             joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                 @Override
                 public void onSuccess(ByteBuffer value) {
-                    // handle join completion in the callback so that the 
callback will be invoked

Review comment:
       Well I should say part of that (the enabling of the heartbeat thread) is 
in JoinGroup response handler, while the rest (update metrics, etc) is in 
SyncGroup response handler.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
               responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
             case CompletingRebalance =>
-                responseCallback(Errors.REBALANCE_IN_PROGRESS)
+              // consumers may start sending heartbeat after join-group 
response, in which case
+              // we should treat them as normal hb request and reset the timer
+              val member = group.get(memberId)

Review comment:
       It would return the error code before: that is because it does not 
expect clients to send heartbeat before sending sync-group requests. Now it is 
not the case any more.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
               responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
             case CompletingRebalance =>
-                responseCallback(Errors.REBALANCE_IN_PROGRESS)

Review comment:
       I had a discussion with @hachikuji about this. I think logically it 
should not return `REBALANCE_IN_PROGRESS` and clients in the future should 
update its handling logic too, maybe after some releases where we can break 
client-broker compatibility.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     resetJoinGroupFuture();
                     needsJoinPrepare = true;
                 } else {
-                    log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
+                    log.info("Generation data was cleared by heartbeat thread 
to {} and state is now {} before " +
+                         "the rebalance callback is triggered, marking this 
rebalance as failed and retry",
+                         generation, state);
                     resetStateAndRejoin();
                     resetJoinGroupFuture();
-                    return false;
                 }
             } else {
                 final RuntimeException exception = future.exception();
-                log.info("Join group failed with {}", exception.toString());
+                log.info("Rebalance failed with {}", exception.toString());

Review comment:
       The reason I changed it is exactly that it may not always due to 
join-group :) If sync-group failed, this could also be triggered.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to