lianetm commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1344659132


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignmentReconciler.java:
##########
@@ -85,122 +111,162 @@ enum ReconciliationResult {
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
     private Optional<RebalanceCallbackEvent> inflightCallback;
 
-    public MemberAssignmentReconciler(LogContext logContext,
-                                      SubscriptionState subscriptions,
-                                      ConsumerMetadata metadata,
-                                      BlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
+    AssignmentReconciler(LogContext logContext,
+                         SubscriptionState subscriptions,
+                         ConsumerMetadata metadata,
+                         BlockingQueue<BackgroundEvent> backgroundEventQueue) {
         this.log = logContext.logger(getClass());
         this.subscriptions = subscriptions;
         this.metadata = metadata;
         this.backgroundEventQueue = backgroundEventQueue;
     }
 
     /**
-     * Perform the revocation process, if necessary, depending on the given 
{@link Assignment target assignment}. If the
-     * {@link SubscriptionState#assignedPartitions() current set of assigned 
partitions} includes entries that are
-     * <em>not</em> in the target assignment, these will be considered for 
revocation. If there is already a
-     * reconciliation in progress (revocation or assignment), this method will 
return without performing any
-     * revocation.
+     * Perform the reconciliation process, as necessary to meet the given 
{@link Assignment target assignment}. Note
+     * that the reconciliation is a multi-step process, and this method should 
be invoked on each heartbeat if
+     * the coordinator provides a {@link Assignment target assignment}.
      *
      * @param assignment Target {@link Assignment}
      * @return {@link ReconciliationResult}
      */
-    ReconciliationResult revoke(Optional<Assignment> assignment) {
+    ReconciliationResult maybeReconcile(Optional<Assignment> assignment) {
         // Check for any outstanding operations first. If a conclusive result 
has already been reached, return that
         // before processing any further.
-        Optional<ReconciliationResult> inflightStatus = checkInflightStatus();
+        if (inflightCallback.isPresent()) {
+            // We don't actually need the _result_ of the event, just to know 
that it's complete.
+            if (inflightCallback.get().future().isDone()) {
+                // This is the happy path--we completed the callback. Clear 
out our inflight callback first, though.
+                inflightCallback = Optional.empty();

Review Comment:
   What about the case where the callback execution does not complete? We need 
to somehow clear this I expect. (If the callback does not complete, the broker 
will eventually kick the member out of the group, the member will rejoin so 
there should be a new `inflightCallback`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to