lianetm commented on code in PR #15579: URL: https://github.com/apache/kafka/pull/15579#discussion_r1535917633
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -1010,49 +1015,55 @@ private void revokeAndAssign(LocalAssignment resolvedAssignment, // and assignment, executed sequentially). CompletableFuture<Void> reconciliationResult = revocationResult.thenCompose(__ -> { - boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch; - if (state == MemberState.RECONCILING && !memberHasRejoined) { + if (!maybeAbortReconciliation()) { // Apply assignment return assignPartitions(assignedTopicIdPartitions, addedPartitions); } else { - log.debug("Revocation callback completed but the member already " + - "transitioned out of the reconciling state for epoch {} into " + - "{} state with epoch {}. Interrupting reconciliation as it's " + - "not relevant anymore,", memberEpochOnReconciliationStart, state, memberEpoch); - String reason = interruptedReconciliationErrorMessage(); CompletableFuture<Void> res = new CompletableFuture<>(); res.completeExceptionally(new KafkaException("Interrupting reconciliation" + - " after revocation. " + reason)); + " after revocation. " + interruptedReconciliationReason())); return res; } }); reconciliationResult.whenComplete((result, error) -> { - markReconciliationCompleted(); if (error != null) { // Leaving member in RECONCILING state after callbacks fail. The member // won't send the ack, and the expectation is that the broker will kick the // member out of the group after the rebalance timeout expires, leading to a // RECONCILING -> FENCED transition. log.error("Reconciliation failed.", error); } else { - if (state == MemberState.RECONCILING) { + if (!maybeAbortReconciliation()) { currentAssignment = resolvedAssignment; // Reschedule the auto commit starting from now that the member has a new assignment. commitRequestManager.resetAutoCommitTimer(); // Make assignment effective on the broker by transitioning to send acknowledge. transitionTo(MemberState.ACKNOWLEDGING); - } else { - String reason = interruptedReconciliationErrorMessage(); - log.error("Interrupting reconciliation after partitions assigned callback " + - "completed. " + reason); } } + markReconciliationCompleted(); }); } + /** + * @return True if the reconciliation in progress should not continue. This could be because + * the member is not in RECONCILING state anymore (member failed or is leaving the group), or + * if it has rejoined the group (note that after rejoining the member could be RECONCILING + * again, so checking the state is not enough) + */ + boolean maybeAbortReconciliation() { + boolean shouldAbort = state != MemberState.RECONCILING || rejoinedWhileReconciliationInProgress; + if (shouldAbort) { + String reason = interruptedReconciliationReason(); + log.error("Interrupting reconciliation because " + reason); + markReconciliationCompleted(); Review Comment: The sale one. We only have a single `reconciliationInProgress` at a time, and that's the one that is aborted, so it will always be the stale one. To complete the story, we keep the new target to reconcile and on the next poll the new reconciliation will be triggered. All tests related to this case end up with the [assertInitialReconciliationDiscardedAfterRejoin](https://github.com/apache/kafka/blob/415092d430115cdaec4f0c70bddca7f93d5a0958/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java#L2616) that validates that the new assignment received after rejoining is kept as target even after the stale reconciliation is discarded. I just extended it to show how the next poll triggers the new reconciliation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org