lucasbru commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1469529850


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -208,22 +209,18 @@ public class MembershipManagerImpl implements 
MembershipManager, ClusterResource
     private final Map<Uuid, String> assignedTopicNamesCache;
 
     /**
-     * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
-     * Items are added to this set every time a target assignment is received. 
Items are removed
-     * when metadata is found for the topic. This is where the member collects 
all assignments
-     * received from the broker, even though they may not be ready to 
reconcile due to missing
+     * Topic IDs received in the last target assignment. Items are added to 
this set every time a

Review Comment:
   Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1010,17 +1000,24 @@ private void resolveMetadataForUnresolvedAssignment() {
             Optional<String> nameFromMetadata = 
findTopicNameInGlobalOrLocalCache(topicId);
             nameFromMetadata.ifPresent(resolvedTopicName -> {
                 // Name resolved, so assignment is ready for reconciliation.
-                addToAssignmentReadyToReconcile(topicId, resolvedTopicName, 
topicPartitions);
+                topicPartitions.forEach(tp -> {
+                    TopicIdPartition topicIdPartition = new TopicIdPartition(
+                        topicId,
+                            new TopicPartition(resolvedTopicName, tp));

Review Comment:
   Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -674,6 +662,7 @@ void transitionToSendingLeaveGroup() {
                 ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
         updateMemberEpoch(leaveEpoch);
         currentAssignment = new HashMap<>();
+        targetAssignmentReconciled = 
currentAssignment.equals(currentTargetAssignment);

Review Comment:
   As for the first point - I don't think it will cause a problem, but you are 
right that the name of the boolean is not 100% precise in this case.
   
   I introduced was mostly to save the overhead of having to compare two sets 
after every heartbeat. We can also not cache this for now, and see if we need 
to add it back later on. Premature optimization yadayada, I guess ;). I'll 
remove it for now.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -997,11 +985,13 @@ void markReconciliationCompleted() {
      *     </li>
      * </ol>
      */
-    private void resolveMetadataForUnresolvedAssignment() {
-        assignmentReadyToReconcile.clear();
+    private SortedSet<TopicIdPartition> resolveMetadataForTargetAssignment() {
+        final SortedSet<TopicIdPartition> assignmentReadyToReconcile = new 
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
+        final HashMap<Uuid, SortedSet<Integer>> unresolved = new 
HashMap<>(currentTargetAssignment);
+
         // Try to resolve topic names from metadata cache or subscription 
cache, and move
         // assignments from the "unresolved" collection, to the 
"readyToReconcile" one.

Review Comment:
   I only updated the variable name `assignmentReadyToReconcile`. Otherwise, I 
think the comment is still valid.
   
   I personally prefer to just have "why" comments and not "what" comments, so 
I'd be open to dropping the comment as well.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1340,18 +1336,15 @@ boolean reconciliationInProgress() {
      * When cluster metadata is updated, try to resolve topic names for topic 
IDs received in
      * assignment that hasn't been resolved yet.
      * <ul>
-     *     <li>Try to find topic names for all unresolved assignments</li>
+     *     <li>Try to find topic names for all assignments</li>
      *     <li>Add discovered topic names to the local topic names cache</li>
      *     <li>If any topics are resolved, trigger a reconciliation 
process</li>
      *     <li>If some topics still remain unresolved, request another 
metadata update</li>
      * </ul>
      */
     @Override
     public void onUpdate(ClusterResource clusterResource) {
-        resolveMetadataForUnresolvedAssignment();
-        if (!assignmentReadyToReconcile.isEmpty()) {
-            reconcile();
-        }
+        reconcile();

Review Comment:
   Yeah, that's better. Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1311,21 +1294,34 @@ public Map<Uuid, SortedSet<Integer>> 
currentAssignment() {
         return this.currentAssignment;
     }
 
-
     /**
      * @return Set of topic IDs received in a target assignment that have not 
been reconciled yet
-     * because topic names are not in metadata. Visible for testing.
+     * because topic names are not in metadata or reconciliation hasn't 
finished. Visible for testing.
      */
-    Set<Uuid> topicsWaitingForMetadata() {
-        return Collections.unmodifiableSet(assignmentUnresolved.keySet());
+    Set<Uuid> topicsAwaitingReconciliation() {

Review Comment:
   You are right! This is confusing. Fixed it as you suggested.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -766,23 +755,28 @@ public void transitionToStale() {
     }
 
     /**
-     * Reconcile the assignment that has been received from the server and for 
which topic names
-     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
-     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * Reconcile the assignment that has been received from the server. If for 
some topics, the
+     * topic ID cannot be matched to a topic name, a metadata update will be 
triggered and only
+     * the subset of topics that are resolvable will be reconciled. 
Reconcilation will trigger the

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to