lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1522901846
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { - log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + + log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { - log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + + log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet<TopicIdPartition> assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); + final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); - SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); - ownedPartitions.addAll(subscriptions.assignedPartitions()); - - // Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are - // being reconciled. Needed for interactions with the centralized subscription state that - // does not support topic IDs yet, and for the callbacks. - SortedSet<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - - // Check same assignment. Based on topic names for now, until topic IDs are properly - // supported in the centralized subscription state object. Note that this check is - // required to make sure that reconciliation is not triggered if the assignment ready to - // be reconciled is the same as the current one (even though the member may remain - // in RECONCILING state if it has some unresolved assignments). - boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - - if (sameAssignmentReceived) { + if (resolvedAssignment.equals(currentAssignment)) { Review Comment: I guess we could do it, but do we have to, and is it the best thing we should do? It seems like a corner case to me, and maybe the easiest and cleanest behavior is to just run a full reconciliation, because the assignment changed in the meantime, even if our client never managed to reconcile the intermediate assignment. It seems users are using the ConsumerRebalanceListener to detect rebalances (as do our system tests). So the case you are describing, sounds like a rebalance event to me, so I think we should call the listener. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org