lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394322340


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -260,42 +924,52 @@ public Optional<String> serverAssignor() {
      * {@inheritDoc}
      */
     @Override
-    public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
+    public Set<TopicPartition> currentAssignment() {
         return this.currentAssignment;
     }
 
 
     /**
-     * @return Assignment that the member received from the server but hasn't 
completely processed
-     * yet. Visible for testing.
+     * @return Set of topic IDs received in a target assignment that have not 
been reconciled yet
+     * because topic names are not in metadata. Visible for testing.
      */
-    Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() 
{
-        return targetAssignment;
+    Set<Uuid> topicsWaitingForMetadata() {
+        return Collections.unmodifiableSet(assignmentUnresolved.keySet());
     }
 
     /**
-     * This indicates that the reconciliation of the target assignment has 
been successfully
-     * completed, so it will make it effective by assigning it to the current 
assignment.
-     *
-     * @params Assignment that has been successfully reconciled. This is 
expected to
-     * match the target assignment defined in {@link #targetAssignment()}
+     * @return Topic partitions received in a target assignment that have been 
resolved in
+     * metadata and are ready to be reconciled. Visible for testing.
+     */
+    Set<TopicPartition> assignmentReadyToReconcile() {
+        return Collections.unmodifiableSet(assignmentReadyToReconcile);
+    }
+
+    /**
+     * @return If there is a reconciliation in process now. Note that 
reconciliation is triggered
+     * by a call to {@link #reconcile()}. Visible for testing.
+     */
+    boolean reconciliationInProgress() {
+        return reconciliationInProgress;
+    }
+
+    /**
+     * When cluster metadata is updated, try to resolve topic names for topic 
IDs received in
+     * assignment that hasn't been resolved yet.
+     * <ul>
+     *     <li>Try to find topic names for all unresolved assignments</li>
+     *     <li>Add discovered topic names to the local topic names cache</li>
+     *     <li>If any topics are resolved, trigger a reconciliation 
process</li>
+     *     <li>If some topics still remain unresolved, request another 
metadata update</li>
+     * </ul>
      */
     @Override
-    public void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-        if (assignment == null) {
-            throw new IllegalArgumentException("Assignment cannot be null");
-        }
-        if (!assignment.equals(targetAssignment.orElse(null))) {
-            // This could be simplified to remove the assignment param and 
just assume that what
-            // was reconciled was the targetAssignment, but keeping it 
explicit and failing fast
-            // here to uncover any issues in the interaction of the assignment 
processing logic
-            // and this.
-            throw new IllegalStateException(String.format("Reconciled 
assignment %s does not " +
-                            "match the expected target assignment %s", 
assignment,
-                    targetAssignment.orElse(null)));
+    public void onUpdate(ClusterResource clusterResource) {
+        resolveMetadataForUnresolvedAssignment();
+        if (!assignmentReadyToReconcile.isEmpty()) {
+            // TODO: improve reconciliation triggering. Initial approach of 
triggering on every
+            //  HB response and metadata update.

Review Comment:
   Filed [KAFKA-15832](https://issues.apache.org/jira/browse/KAFKA-15832) for 
this and I will take care of it right after this PR as a follow-up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to