cadonna commented on code in PR #15035:
URL: https://github.com/apache/kafka/pull/15035#discussion_r1431466392


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -728,7 +732,6 @@ public boolean shouldSkipHeartbeat() {
     @Override
     public void transitionToStaled() {
         memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
-        currentAssignment.clear();

Review Comment:
   Why do we not need to remove the current assignment if we leave the group? 
Is this because of `updateSubscription()` on line 677?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
             return NetworkClientDelegate.PollResult.EMPTY;
         }
         pollTimer.update(currentTimeMs);
-        if (pollTimer.isExpired()) {
+        // If the poll timer expires during reconciliation, we need to wait 
till the reconciliation completes before
+        // sending another leave group.

Review Comment:
   Could you please elaborate why we need to wait? It is not clear to me, 
because I thought that we can leave the group whenever we want.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
             return NetworkClientDelegate.PollResult.EMPTY;
         }
         pollTimer.update(currentTimeMs);
-        if (pollTimer.isExpired()) {
+        // If the poll timer expires during reconciliation, we need to wait 
till the reconciliation completes before
+        // sending another leave group.
+        if (pollTimer.isExpired() && membershipManager.state() == 
MemberState.STABLE) {

Review Comment:
   Is there a unit test that tests the behavior when this condition is not 
satisfied?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() {
         MemberState state = state();
         if (isStaled()) {
             log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-            transitionToJoining();
+            // clear the current assignment and subscription, and trigger 
rebalance listener on the next poll
+            
invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r,
 e) -> {

Review Comment:
   Is it correct to call `onPartitionRevoked()` here? Technically, the member 
is not a member of the group anymore. I am not sure it should be allowed to do 
anything group-related like committing offsets which is one of the main 
purposes of `onPartitionRevoked()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to