lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525940513
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -911,9 +911,13 @@ void maybeReconcile() { SortedSet<TopicIdPartition> assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); final LocalAssignmentImpl resolvedAssignment = new LocalAssignmentImpl(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); - if (resolvedAssignment.equals(currentAssignment)) { - log.debug("Ignoring reconciliation attempt. Target assignment ready to reconcile {} " + - "is equal to the member current assignment.", resolvedAssignment); + if (currentAssignment != LocalAssignmentImpl.NONE && + resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 && + resolvedAssignment.partitions.equals(currentAssignment.partitions)) { + log.debug("Ignoring reconciliation attempt. The resolvable fragment of the target assignment {} " + Review Comment: I didn't want to talk about the local epoch here, since it's more of implementation detail how to detect intermediate assignments. But then I should log the local epoch I supposed. Updated it accordingly. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -1028,9 +1028,9 @@ public void testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() { verifyReconciliationNotTriggered(membershipManager); membershipManager.poll(time.milliseconds()); + membershipManager.onHeartbeatRequestSent(); Review Comment: Done ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -2279,22 +2277,23 @@ private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscri return mockJoinAndReceiveAssignment(expectSubscriptionUpdated, createAssignment(expectSubscriptionUpdated)); } - private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscriptionUpdated, + private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean triggerReconciliation, Review Comment: Done ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -976,6 +974,10 @@ void maybeReconcile() { } revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions); + }).whenComplete((__, error) -> { + if (error != null) { + log.error("Reconciliation failed.", error); Review Comment: Nope, the exception handling inside `revokeAndAssign` is only triggered if the revoke and assign future fails. If `revokeAndAssign` fails outside the future (in particular, inside `revokePartitions`, there is logic that may fail), the exception falls through to here and was silently swallowed before, which cost me an hour of debugging to find. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org