lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394322340
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -260,42 +924,52 @@ public Optional<String> serverAssignor() { * {@inheritDoc} */ @Override - public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { + public Set<TopicPartition> currentAssignment() { return this.currentAssignment; } /** - * @return Assignment that the member received from the server but hasn't completely processed - * yet. Visible for testing. + * @return Set of topic IDs received in a target assignment that have not been reconciled yet + * because topic names are not in metadata. Visible for testing. */ - Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() { - return targetAssignment; + Set<Uuid> topicsWaitingForMetadata() { + return Collections.unmodifiableSet(assignmentUnresolved.keySet()); } /** - * This indicates that the reconciliation of the target assignment has been successfully - * completed, so it will make it effective by assigning it to the current assignment. - * - * @params Assignment that has been successfully reconciled. This is expected to - * match the target assignment defined in {@link #targetAssignment()} + * @return Topic partitions received in a target assignment that have been resolved in + * metadata and are ready to be reconciled. Visible for testing. + */ + Set<TopicPartition> assignmentReadyToReconcile() { + return Collections.unmodifiableSet(assignmentReadyToReconcile); + } + + /** + * @return If there is a reconciliation in process now. Note that reconciliation is triggered + * by a call to {@link #reconcile()}. Visible for testing. + */ + boolean reconciliationInProgress() { + return reconciliationInProgress; + } + + /** + * When cluster metadata is updated, try to resolve topic names for topic IDs received in + * assignment that hasn't been resolved yet. + * <ul> + * <li>Try to find topic names for all unresolved assignments</li> + * <li>Add discovered topic names to the local topic names cache</li> + * <li>If any topics are resolved, trigger a reconciliation process</li> + * <li>If some topics still remain unresolved, request another metadata update</li> + * </ul> */ @Override - public void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - if (assignment == null) { - throw new IllegalArgumentException("Assignment cannot be null"); - } - if (!assignment.equals(targetAssignment.orElse(null))) { - // This could be simplified to remove the assignment param and just assume that what - // was reconciled was the targetAssignment, but keeping it explicit and failing fast - // here to uncover any issues in the interaction of the assignment processing logic - // and this. - throw new IllegalStateException(String.format("Reconciled assignment %s does not " + - "match the expected target assignment %s", assignment, - targetAssignment.orElse(null))); + public void onUpdate(ClusterResource clusterResource) { + resolveMetadataForUnresolvedAssignment(); + if (!assignmentReadyToReconcile.isEmpty()) { + // TODO: improve reconciliation triggering. Initial approach of triggering on every + // HB response and metadata update. Review Comment: Filed [KAFKA-15832](https://issues.apache.org/jira/browse/KAFKA-15832) for this and I will take care of it right after this PR as a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org