lianetm commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1334676439
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -237,4 +253,64 @@ public void
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
}
maybeTransitionToStable();
}
+
+ /**
+ * Performs the reconciliation between the current topic assignment and
the {@link Assignment} provided in the
+ * heartbeat response. Per the KIP-848 protocol, we perform the following:
+ *
+ * <ol>
+ * <li>Revoke partitions, if any</li>
+ * <li>Heartbeat to acknowledge revoked partitions</li>
+ * <li>Assign partitions, if any</li>
+ * <li>Heartbeat to acknowledge assigned partitions</li>
+ * </ol>
+ *
+ * TODO: What are the the state changes here?
+ * TODO: Is SubscriptionState sufficient to build the next heartbeat
request?
+ * TODO: Where do we need to call
ConsumerRebalanceListener.onPartitionsLost()?
+ */
+ void reconcile() {
+ transitionTo(MemberState.RECONCILING);
+
+ Timer remainingAssignmentTime = time.timer(10000);
+
+ // First, we need to determine if any partitions need to be revoked.
+ {
+ ReconciliationResult result = reconciler.revoke(targetAssignment,
remainingAssignmentTime);
+ remainingAssignmentTime.update();
+
+ if (result == ReconciliationResult.COMPLETED) {
+ // If we've revoked one or more partitions, we need to send an
acknowledgement request ASAP to
+ // let the coordinator know that they've been removed locally.
+ return;
+ } else if (result == ReconciliationResult.EXPIRED) {
+ // TODO: what do we do here?
Review Comment:
I would say we should aim for keeping only state updates and member info in
the MembershipMgr, not the logic for triggering the `onPartitionsLost`. It
could be maybe the assignment reconciler that, when it finishes processing,
triggers a HB or a partitionsLost depending on the state (if state is still
reconciling and it finished processing ok we just need to HB, if state is
fenced then we don't HB. We trigger onPartitionsLost and then trigger HB to
rejoin).
Seems all like interaction between the AssginemntReconciler and the HB
request, only checking the state to get the current state and determine what to
do (and eventually updating the state with the outcome)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]