philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1332505218


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -237,4 +253,64 @@ public void 
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
         }
         maybeTransitionToStable();
     }
+
+    /**
+     * Performs the reconciliation between the current topic assignment and 
the {@link Assignment} provided in the
+     * heartbeat response. Per the KIP-848 protocol, we perform the following:
+     *
+     * <ol>
+     *     <li>Revoke partitions, if any</li>
+     *     <li>Heartbeat to acknowledge revoked partitions</li>
+     *     <li>Assign partitions, if any</li>
+     *     <li>Heartbeat to acknowledge assigned partitions</li>
+     * </ol>
+     *
+     * TODO: What are the the state changes here?
+     * TODO: Is SubscriptionState sufficient to build the next heartbeat 
request?
+     * TODO: Where do we need to call 
ConsumerRebalanceListener.onPartitionsLost()?
+     */
+    void reconcile() {
+        transitionTo(MemberState.RECONCILING);
+
+        Timer remainingAssignmentTime = time.timer(10000);
+
+        // First, we need to determine if any partitions need to be revoked.
+        {
+            ReconciliationResult result = reconciler.revoke(targetAssignment, 
remainingAssignmentTime);
+            remainingAssignmentTime.update();
+
+            if (result == ReconciliationResult.COMPLETED) {
+                // If we've revoked one or more partitions, we need to send an 
acknowledgement request ASAP to
+                // let the coordinator know that they've been removed locally.
+                return;
+            } else if (result == ReconciliationResult.EXPIRED) {
+                // TODO: what do we do here?

Review Comment:
   This is the description from kip-848 
   ```
   The chosen member is expected to complete the assignment process within the 
rebalance timeout. The time on the coordinator side starts ticking when the 
member is notified. If the process is not completed within the rebalance 
timeout, the group coordinator picks up another member to run the assignment. 
Note that the previous chosen member is not fenced here because the fencing is 
only done based on the session.
   ```
   
   It seems that you should send a heartbeat to the GC and get a fenced 
exception, which results in revoking all partitions.
   
   @dajac can provide a more accurate answer here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to