lucasbru commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1472523826


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1350,21 +1321,33 @@ public Map<Uuid, SortedSet<Integer>> 
currentAssignment() {
         return this.currentAssignment;
     }
 
-
     /**
      * @return Set of topic IDs received in a target assignment that have not 
been reconciled yet
-     * because topic names are not in metadata. Visible for testing.
+     * because topic names are not in metadata or reconciliation hasn't 
finished. Reconciliation
+     * hasn't finished for a topic if the currently active assignment has a 
different set of partitions
+     * for the topic than the target assignment.
+     *
+     * Visible for testing.
      */
-    Set<Uuid> topicsWaitingForMetadata() {
-        return Collections.unmodifiableSet(assignmentUnresolved.keySet());
+    Set<Uuid> topicsAwaitingReconciliation() {
+        return topicPartitionsAwaitingReconciliation().keySet();
     }
 
     /**
-     * @return Topic partitions received in a target assignment that have been 
resolved in
-     * metadata and are ready to be reconciled. Visible for testing.
+     * @return Map of topics partitions received in a target assignment that 
have not been
+     * reconciled yet because topic names are not in metadata or 
reconciliation hasn't finished.
+     * The value will always contain all partitions in the target assignment.
+     *
+     * Visible for testing.
      */
-    Set<TopicIdPartition> assignmentReadyToReconcile() {
-        return Collections.unmodifiableSet(assignmentReadyToReconcile);
+    Map<Uuid, SortedSet<Integer>> topicPartitionsAwaitingReconciliation() {
+        final Map<Uuid, SortedSet<Integer>> topicPartitionMap = new 
HashMap<>();
+        currentTargetAssignment.forEach((x, y) -> {
+            if (!currentAssignment.containsKey(x) || 
!currentAssignment.get(x).equals(y)) {

Review Comment:
   Yes, it's only used for testing. This method is a replacement for 
`assignmentReadyToReconcile` which was also used only in testing. You are right 
that semantics were a bit weird, I implemented so that it replicates the 
behavior of the (old) internal state. I changed it to have more reasonable 
semantics (the value contains only the partitions that are missing from the 
currently reconciled assignment). For the existing tests, there is no 
difference between the two semantics.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1032,17 +1011,24 @@ private void resolveMetadataForUnresolvedAssignment() {
             Optional<String> nameFromMetadata = 
findTopicNameInGlobalOrLocalCache(topicId);
             nameFromMetadata.ifPresent(resolvedTopicName -> {
                 // Name resolved, so assignment is ready for reconciliation.
-                addToAssignmentReadyToReconcile(topicId, resolvedTopicName, 
topicPartitions);
+                topicPartitions.forEach(tp -> {
+                    TopicIdPartition topicIdPartition = new TopicIdPartition(
+                        topicId,
+                        new TopicPartition(resolvedTopicName, tp));

Review Comment:
   Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -788,23 +767,28 @@ public void transitionToStale() {
     }
 
     /**
-     * Reconcile the assignment that has been received from the server and for 
which topic names
-     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
-     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * Reconcile the assignment that has been received from the server. If for 
some topics, the
+     * topic ID cannot be matched to a topic name, a metadata update will be 
triggered and only
+     * the subset of topics that are resolvable will be reconciled. 
Reconciliation will trigger the
+     * callbacks and update the subscription state. Note that only one 
reconciliation
      * can be in progress at a time. If there is already another one in 
progress when this is
      * triggered, it will be no-op, and the assignment will be reconciled on 
the next
      * reconciliation loop.
      */
-    boolean reconcile() {
+    void reconcile() {
+        if (targetAssignmentReconciled()) {
+            log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+                    "current assignment.");
+            return;
+        }
         if (reconciliationInProgress) {
             log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
-                    assignmentReadyToReconcile + " will be handled in the next 
reconciliation loop.");
-            return false;
+                currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
+            return;
         }
 
-        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
-        SortedSet<TopicIdPartition> assignedTopicIdPartitions = new 
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
-        assignedTopicIdPartitions.addAll(assignmentReadyToReconcile);
+        // Resolve metadata for target assignment
+        SortedSet<TopicIdPartition> assignedTopicIdPartitions = 
resolveMetadataForTargetAssignment();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to