lianetm commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1333187091
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -237,4 +253,64 @@ public void
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
}
maybeTransitionToStable();
}
+
+ /**
+ * Performs the reconciliation between the current topic assignment and
the {@link Assignment} provided in the
+ * heartbeat response. Per the KIP-848 protocol, we perform the following:
+ *
+ * <ol>
+ * <li>Revoke partitions, if any</li>
+ * <li>Heartbeat to acknowledge revoked partitions</li>
+ * <li>Assign partitions, if any</li>
+ * <li>Heartbeat to acknowledge assigned partitions</li>
+ * </ol>
+ *
+ * TODO: What are the the state changes here?
+ * TODO: Is SubscriptionState sufficient to build the next heartbeat
request?
+ * TODO: Where do we need to call
ConsumerRebalanceListener.onPartitionsLost()?
+ */
+ void reconcile() {
+ transitionTo(MemberState.RECONCILING);
+
+ Timer remainingAssignmentTime = time.timer(10000);
+
+ // First, we need to determine if any partitions need to be revoked.
+ {
+ ReconciliationResult result = reconciler.revoke(targetAssignment,
remainingAssignmentTime);
+ remainingAssignmentTime.update();
+
+ if (result == ReconciliationResult.COMPLETED) {
+ // If we've revoked one or more partitions, we need to send an
acknowledgement request ASAP to
Review Comment:
I get your dilemma, and I think the root cause relates to my comment above,
and this whole reconcile logic needs to be driven by the HB, not the state.
What about we define the components such that:
- HeartbeatReqMgr: responsible for preparing and timing HB request, and
handling responses. Handling responses involves:
- update shared stated based on HB responses (calling membershipMgr)
- trigger assignment processing (calling AssignmentReconciler), which
will directly affect the timing of the HB requests.
- AssignmentReconciler: responsible for processing an assignment (invoke
callbacks & update shared state)
- MembershipMgr: responsible for keeping member info and state. This is
mainly a shared state between HB manager and AssignmentReconciler, nothing
more.
The HeartbeatReqMgr is the central player, which makes sense because
everything is based on the HB requests, And it relies on the
AssignmentReconciler (to delegate assignment processing), and on the
MembershipMgr (just to keep shared info in one place).
(Let's have a chat in our sync)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]